1 /* 2 * Copyright (C) 2006 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 23 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 24 import static java.lang.Thread.currentThread; 25 import static java.util.Arrays.asList; 26 27 import com.google.common.annotations.Beta; 28 import com.google.common.base.Function; 29 import com.google.common.base.Optional; 30 import com.google.common.base.Preconditions; 31 import com.google.common.collect.ImmutableCollection; 32 import com.google.common.collect.ImmutableList; 33 import com.google.common.collect.Lists; 34 import com.google.common.collect.Ordering; 35 import com.google.common.collect.Queues; 36 import com.google.common.collect.Sets; 37 38 import java.lang.reflect.Constructor; 39 import java.lang.reflect.InvocationTargetException; 40 import java.lang.reflect.UndeclaredThrowableException; 41 import java.util.Arrays; 42 import java.util.Collections; 43 import java.util.List; 44 import java.util.Set; 45 import java.util.concurrent.Callable; 46 import java.util.concurrent.CancellationException; 47 import java.util.concurrent.ConcurrentLinkedQueue; 48 import java.util.concurrent.ExecutionException; 49 import java.util.concurrent.Executor; 50 import java.util.concurrent.Future; 51 import java.util.concurrent.RejectedExecutionException; 52 import java.util.concurrent.TimeUnit; 53 import java.util.concurrent.TimeoutException; 54 import java.util.concurrent.atomic.AtomicBoolean; 55 import java.util.concurrent.atomic.AtomicInteger; 56 import java.util.logging.Level; 57 import java.util.logging.Logger; 58 59 import javax.annotation.Nullable; 60 61 /** 62 * Static utility methods pertaining to the {@link Future} interface. 63 * 64 * <p>Many of these methods use the {@link ListenableFuture} API; consult the 65 * Guava User Guide article on <a href= 66 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained"> 67 * {@code ListenableFuture}</a>. 68 * 69 * @author Kevin Bourrillion 70 * @author Nishant Thakkar 71 * @author Sven Mawson 72 * @since 1.0 73 */ 74 @Beta 75 public final class Futures { Futures()76 private Futures() {} 77 78 /** 79 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} 80 * and a {@link Function} that maps from {@link Exception} instances into the 81 * appropriate checked type. 82 * 83 * <p>The given mapping function will be applied to an 84 * {@link InterruptedException}, a {@link CancellationException}, or an 85 * {@link ExecutionException}. 86 * See {@link Future#get()} for details on the exceptions thrown. 87 * 88 * @since 9.0 (source-compatible since 1.0) 89 */ makeChecked( ListenableFuture<V> future, Function<? super Exception, X> mapper)90 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 91 ListenableFuture<V> future, Function<? super Exception, X> mapper) { 92 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); 93 } 94 95 private abstract static class ImmediateFuture<V> 96 implements ListenableFuture<V> { 97 98 private static final Logger log = 99 Logger.getLogger(ImmediateFuture.class.getName()); 100 101 @Override addListener(Runnable listener, Executor executor)102 public void addListener(Runnable listener, Executor executor) { 103 checkNotNull(listener, "Runnable was null."); 104 checkNotNull(executor, "Executor was null."); 105 try { 106 executor.execute(listener); 107 } catch (RuntimeException e) { 108 // ListenableFuture's contract is that it will not throw unchecked 109 // exceptions, so log the bad runnable and/or executor and swallow it. 110 log.log(Level.SEVERE, "RuntimeException while executing runnable " 111 + listener + " with executor " + executor, e); 112 } 113 } 114 115 @Override cancel(boolean mayInterruptIfRunning)116 public boolean cancel(boolean mayInterruptIfRunning) { 117 return false; 118 } 119 120 @Override get()121 public abstract V get() throws ExecutionException; 122 123 @Override get(long timeout, TimeUnit unit)124 public V get(long timeout, TimeUnit unit) throws ExecutionException { 125 checkNotNull(unit); 126 return get(); 127 } 128 129 @Override isCancelled()130 public boolean isCancelled() { 131 return false; 132 } 133 134 @Override isDone()135 public boolean isDone() { 136 return true; 137 } 138 } 139 140 private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> { 141 142 @Nullable private final V value; 143 ImmediateSuccessfulFuture(@ullable V value)144 ImmediateSuccessfulFuture(@Nullable V value) { 145 this.value = value; 146 } 147 148 @Override get()149 public V get() { 150 return value; 151 } 152 } 153 154 private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception> 155 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 156 157 @Nullable private final V value; 158 ImmediateSuccessfulCheckedFuture(@ullable V value)159 ImmediateSuccessfulCheckedFuture(@Nullable V value) { 160 this.value = value; 161 } 162 163 @Override get()164 public V get() { 165 return value; 166 } 167 168 @Override checkedGet()169 public V checkedGet() { 170 return value; 171 } 172 173 @Override checkedGet(long timeout, TimeUnit unit)174 public V checkedGet(long timeout, TimeUnit unit) { 175 checkNotNull(unit); 176 return value; 177 } 178 } 179 180 private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> { 181 182 private final Throwable thrown; 183 ImmediateFailedFuture(Throwable thrown)184 ImmediateFailedFuture(Throwable thrown) { 185 this.thrown = thrown; 186 } 187 188 @Override get()189 public V get() throws ExecutionException { 190 throw new ExecutionException(thrown); 191 } 192 } 193 194 private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> { 195 196 private final CancellationException thrown; 197 ImmediateCancelledFuture()198 ImmediateCancelledFuture() { 199 this.thrown = new CancellationException("Immediate cancelled future."); 200 } 201 202 @Override isCancelled()203 public boolean isCancelled() { 204 return true; 205 } 206 207 @Override get()208 public V get() { 209 throw AbstractFuture.cancellationExceptionWithCause( 210 "Task was cancelled.", thrown); 211 } 212 } 213 214 private static class ImmediateFailedCheckedFuture<V, X extends Exception> 215 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 216 217 private final X thrown; 218 ImmediateFailedCheckedFuture(X thrown)219 ImmediateFailedCheckedFuture(X thrown) { 220 this.thrown = thrown; 221 } 222 223 @Override get()224 public V get() throws ExecutionException { 225 throw new ExecutionException(thrown); 226 } 227 228 @Override checkedGet()229 public V checkedGet() throws X { 230 throw thrown; 231 } 232 233 @Override checkedGet(long timeout, TimeUnit unit)234 public V checkedGet(long timeout, TimeUnit unit) throws X { 235 checkNotNull(unit); 236 throw thrown; 237 } 238 } 239 240 /** 241 * Creates a {@code ListenableFuture} which has its value set immediately upon 242 * construction. The getters just return the value. This {@code Future} can't 243 * be canceled or timed out and its {@code isDone()} method always returns 244 * {@code true}. 245 */ immediateFuture(@ullable V value)246 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 247 return new ImmediateSuccessfulFuture<V>(value); 248 } 249 250 /** 251 * Returns a {@code CheckedFuture} which has its value set immediately upon 252 * construction. 253 * 254 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 255 * method always returns {@code true}. Calling {@code get()} or {@code 256 * checkedGet()} will immediately return the provided value. 257 */ 258 public static <V, X extends Exception> CheckedFuture<V, X> immediateCheckedFuture(@ullable V value)259 immediateCheckedFuture(@Nullable V value) { 260 return new ImmediateSuccessfulCheckedFuture<V, X>(value); 261 } 262 263 /** 264 * Returns a {@code ListenableFuture} which has an exception set immediately 265 * upon construction. 266 * 267 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 268 * method always returns {@code true}. Calling {@code get()} will immediately 269 * throw the provided {@code Throwable} wrapped in an {@code 270 * ExecutionException}. 271 */ immediateFailedFuture( Throwable throwable)272 public static <V> ListenableFuture<V> immediateFailedFuture( 273 Throwable throwable) { 274 checkNotNull(throwable); 275 return new ImmediateFailedFuture<V>(throwable); 276 } 277 278 /** 279 * Creates a {@code ListenableFuture} which is cancelled immediately upon 280 * construction, so that {@code isCancelled()} always returns {@code true}. 281 * 282 * @since 14.0 283 */ immediateCancelledFuture()284 public static <V> ListenableFuture<V> immediateCancelledFuture() { 285 return new ImmediateCancelledFuture<V>(); 286 } 287 288 /** 289 * Returns a {@code CheckedFuture} which has an exception set immediately upon 290 * construction. 291 * 292 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 293 * method always returns {@code true}. Calling {@code get()} will immediately 294 * throw the provided {@code Exception} wrapped in an {@code 295 * ExecutionException}, and calling {@code checkedGet()} will throw the 296 * provided exception itself. 297 */ 298 public static <V, X extends Exception> CheckedFuture<V, X> immediateFailedCheckedFuture(X exception)299 immediateFailedCheckedFuture(X exception) { 300 checkNotNull(exception); 301 return new ImmediateFailedCheckedFuture<V, X>(exception); 302 } 303 304 /** 305 * Returns a {@code Future} whose result is taken from the given primary 306 * {@code input} or, if the primary input fails, from the {@code Future} 307 * provided by the {@code fallback}. {@link FutureFallback#create} is not 308 * invoked until the primary input has failed, so if the primary input 309 * succeeds, it is never invoked. If, during the invocation of {@code 310 * fallback}, an exception is thrown, this exception is used as the result of 311 * the output {@code Future}. 312 * 313 * <p>Below is an example of a fallback that returns a default value if an 314 * exception occurs: 315 * 316 * <pre> {@code 317 * ListenableFuture<Integer> fetchCounterFuture = ...; 318 * 319 * // Falling back to a zero counter in case an exception happens when 320 * // processing the RPC to fetch counters. 321 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 322 * fetchCounterFuture, new FutureFallback<Integer>() { 323 * public ListenableFuture<Integer> create(Throwable t) { 324 * // Returning "0" as the default for the counter when the 325 * // exception happens. 326 * return immediateFuture(0); 327 * } 328 * });}</pre> 329 * 330 * <p>The fallback can also choose to propagate the original exception when 331 * desired: 332 * 333 * <pre> {@code 334 * ListenableFuture<Integer> fetchCounterFuture = ...; 335 * 336 * // Falling back to a zero counter only in case the exception was a 337 * // TimeoutException. 338 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 339 * fetchCounterFuture, new FutureFallback<Integer>() { 340 * public ListenableFuture<Integer> create(Throwable t) { 341 * if (t instanceof TimeoutException) { 342 * return immediateFuture(0); 343 * } 344 * return immediateFailedFuture(t); 345 * } 346 * });}</pre> 347 * 348 * <p>Note: If the derived {@code Future} is slow or heavyweight to create 349 * (whether the {@code Future} itself is slow or heavyweight to complete is 350 * irrelevant), consider {@linkplain #withFallback(ListenableFuture, 351 * FutureFallback, Executor) supplying an executor}. If you do not supply an 352 * executor, {@code withFallback} will use a 353 * {@linkplain MoreExecutors#directExecutor direct executor}, which carries 354 * some caveats for heavier operations. For example, the call to {@code 355 * fallback.create} may run on an unpredictable or undesirable thread: 356 * 357 * <ul> 358 * <li>If the input {@code Future} is done at the time {@code withFallback} 359 * is called, {@code withFallback} will call {@code fallback.create} inline. 360 * <li>If the input {@code Future} is not yet done, {@code withFallback} will 361 * schedule {@code fallback.create} to be run by the thread that completes 362 * the input {@code Future}, which may be an internal system thread such as 363 * an RPC network thread. 364 * </ul> 365 * 366 * <p>Also note that, regardless of which thread executes the {@code 367 * fallback.create}, all other registered but unexecuted listeners are 368 * prevented from running during its execution, even if those listeners are 369 * to run in other executors. 370 * 371 * @param input the primary input {@code Future} 372 * @param fallback the {@link FutureFallback} implementation to be called if 373 * {@code input} fails 374 * @since 14.0 375 */ withFallback( ListenableFuture<? extends V> input, FutureFallback<? extends V> fallback)376 public static <V> ListenableFuture<V> withFallback( 377 ListenableFuture<? extends V> input, 378 FutureFallback<? extends V> fallback) { 379 return withFallback(input, fallback, directExecutor()); 380 } 381 382 /** 383 * Returns a {@code Future} whose result is taken from the given primary 384 * {@code input} or, if the primary input fails, from the {@code Future} 385 * provided by the {@code fallback}. {@link FutureFallback#create} is not 386 * invoked until the primary input has failed, so if the primary input 387 * succeeds, it is never invoked. If, during the invocation of {@code 388 * fallback}, an exception is thrown, this exception is used as the result of 389 * the output {@code Future}. 390 * 391 * <p>Below is an example of a fallback that returns a default value if an 392 * exception occurs: 393 * 394 * <pre> {@code 395 * ListenableFuture<Integer> fetchCounterFuture = ...; 396 * 397 * // Falling back to a zero counter in case an exception happens when 398 * // processing the RPC to fetch counters. 399 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 400 * fetchCounterFuture, new FutureFallback<Integer>() { 401 * public ListenableFuture<Integer> create(Throwable t) { 402 * // Returning "0" as the default for the counter when the 403 * // exception happens. 404 * return immediateFuture(0); 405 * } 406 * }, directExecutor());}</pre> 407 * 408 * <p>The fallback can also choose to propagate the original exception when 409 * desired: 410 * 411 * <pre> {@code 412 * ListenableFuture<Integer> fetchCounterFuture = ...; 413 * 414 * // Falling back to a zero counter only in case the exception was a 415 * // TimeoutException. 416 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 417 * fetchCounterFuture, new FutureFallback<Integer>() { 418 * public ListenableFuture<Integer> create(Throwable t) { 419 * if (t instanceof TimeoutException) { 420 * return immediateFuture(0); 421 * } 422 * return immediateFailedFuture(t); 423 * } 424 * }, directExecutor());}</pre> 425 * 426 * <p>When the execution of {@code fallback.create} is fast and lightweight 427 * (though the {@code Future} it returns need not meet these criteria), 428 * consider {@linkplain #withFallback(ListenableFuture, FutureFallback) 429 * omitting the executor} or explicitly specifying {@code 430 * directExecutor}. However, be aware of the caveats documented in the 431 * link above. 432 * 433 * @param input the primary input {@code Future} 434 * @param fallback the {@link FutureFallback} implementation to be called if 435 * {@code input} fails 436 * @param executor the executor that runs {@code fallback} if {@code input} 437 * fails 438 * @since 14.0 439 */ withFallback( ListenableFuture<? extends V> input, FutureFallback<? extends V> fallback, Executor executor)440 public static <V> ListenableFuture<V> withFallback( 441 ListenableFuture<? extends V> input, 442 FutureFallback<? extends V> fallback, Executor executor) { 443 checkNotNull(fallback); 444 return new FallbackFuture<V>(input, fallback, executor); 445 } 446 447 /** 448 * A future that falls back on a second, generated future, in case its 449 * original future fails. 450 */ 451 private static class FallbackFuture<V> extends AbstractFuture<V> { 452 453 private volatile ListenableFuture<? extends V> running; 454 FallbackFuture(ListenableFuture<? extends V> input, final FutureFallback<? extends V> fallback, final Executor executor)455 FallbackFuture(ListenableFuture<? extends V> input, 456 final FutureFallback<? extends V> fallback, 457 final Executor executor) { 458 running = input; 459 addCallback(running, new FutureCallback<V>() { 460 @Override 461 public void onSuccess(V value) { 462 set(value); 463 } 464 465 @Override 466 public void onFailure(Throwable t) { 467 if (isCancelled()) { 468 return; 469 } 470 try { 471 running = fallback.create(t); 472 if (isCancelled()) { // in case cancel called in the meantime 473 running.cancel(wasInterrupted()); 474 return; 475 } 476 addCallback(running, new FutureCallback<V>() { 477 @Override 478 public void onSuccess(V value) { 479 set(value); 480 } 481 482 @Override 483 public void onFailure(Throwable t) { 484 if (running.isCancelled()) { 485 cancel(false); 486 } else { 487 setException(t); 488 } 489 } 490 }, directExecutor()); 491 } catch (Throwable e) { 492 setException(e); 493 } 494 } 495 }, executor); 496 } 497 498 @Override cancel(boolean mayInterruptIfRunning)499 public boolean cancel(boolean mayInterruptIfRunning) { 500 if (super.cancel(mayInterruptIfRunning)) { 501 running.cancel(mayInterruptIfRunning); 502 return true; 503 } 504 return false; 505 } 506 } 507 508 /** 509 * Returns a new {@code ListenableFuture} whose result is asynchronously 510 * derived from the result of the given {@code Future}. More precisely, the 511 * returned {@code Future} takes its result from a {@code Future} produced by 512 * applying the given {@code AsyncFunction} to the result of the original 513 * {@code Future}. Example: 514 * 515 * <pre> {@code 516 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 517 * AsyncFunction<RowKey, QueryResult> queryFunction = 518 * new AsyncFunction<RowKey, QueryResult>() { 519 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 520 * return dataService.read(rowKey); 521 * } 522 * }; 523 * ListenableFuture<QueryResult> queryFuture = 524 * transform(rowKeyFuture, queryFunction);}</pre> 525 * 526 * <p>Note: If the derived {@code Future} is slow or heavyweight to create 527 * (whether the {@code Future} itself is slow or heavyweight to complete is 528 * irrelevant), consider {@linkplain #transform(ListenableFuture, 529 * AsyncFunction, Executor) supplying an executor}. If you do not supply an 530 * executor, {@code transform} will use a 531 * {@linkplain MoreExecutors#directExecutor direct executor}, which carries 532 * some caveats for heavier operations. For example, the call to {@code 533 * function.apply} may run on an unpredictable or undesirable thread: 534 * 535 * <ul> 536 * <li>If the input {@code Future} is done at the time {@code transform} is 537 * called, {@code transform} will call {@code function.apply} inline. 538 * <li>If the input {@code Future} is not yet done, {@code transform} will 539 * schedule {@code function.apply} to be run by the thread that completes the 540 * input {@code Future}, which may be an internal system thread such as an 541 * RPC network thread. 542 * </ul> 543 * 544 * <p>Also note that, regardless of which thread executes the {@code 545 * function.apply}, all other registered but unexecuted listeners are 546 * prevented from running during its execution, even if those listeners are 547 * to run in other executors. 548 * 549 * <p>The returned {@code Future} attempts to keep its cancellation state in 550 * sync with that of the input future and that of the future returned by the 551 * function. That is, if the returned {@code Future} is cancelled, it will 552 * attempt to cancel the other two, and if either of the other two is 553 * cancelled, the returned {@code Future} will receive a callback in which it 554 * will attempt to cancel itself. 555 * 556 * @param input The future to transform 557 * @param function A function to transform the result of the input future 558 * to the result of the output future 559 * @return A future that holds result of the function (if the input succeeded) 560 * or the original input's failure (if not) 561 * @since 11.0 562 */ transform(ListenableFuture<I> input, AsyncFunction<? super I, ? extends O> function)563 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 564 AsyncFunction<? super I, ? extends O> function) { 565 ChainingListenableFuture<I, O> output = 566 new ChainingListenableFuture<I, O>(function, input); 567 input.addListener(output, directExecutor()); 568 return output; 569 } 570 571 /** 572 * Returns a new {@code ListenableFuture} whose result is asynchronously 573 * derived from the result of the given {@code Future}. More precisely, the 574 * returned {@code Future} takes its result from a {@code Future} produced by 575 * applying the given {@code AsyncFunction} to the result of the original 576 * {@code Future}. Example: 577 * 578 * <pre> {@code 579 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 580 * AsyncFunction<RowKey, QueryResult> queryFunction = 581 * new AsyncFunction<RowKey, QueryResult>() { 582 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 583 * return dataService.read(rowKey); 584 * } 585 * }; 586 * ListenableFuture<QueryResult> queryFuture = 587 * transform(rowKeyFuture, queryFunction, executor);}</pre> 588 * 589 * <p>The returned {@code Future} attempts to keep its cancellation state in 590 * sync with that of the input future and that of the future returned by the 591 * chain function. That is, if the returned {@code Future} is cancelled, it 592 * will attempt to cancel the other two, and if either of the other two is 593 * cancelled, the returned {@code Future} will receive a callback in which it 594 * will attempt to cancel itself. 595 * 596 * <p>When the execution of {@code function.apply} is fast and lightweight 597 * (though the {@code Future} it returns need not meet these criteria), 598 * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting 599 * the executor} or explicitly specifying {@code directExecutor}. 600 * However, be aware of the caveats documented in the link above. 601 * 602 * @param input The future to transform 603 * @param function A function to transform the result of the input future 604 * to the result of the output future 605 * @param executor Executor to run the function in. 606 * @return A future that holds result of the function (if the input succeeded) 607 * or the original input's failure (if not) 608 * @since 11.0 609 */ transform(ListenableFuture<I> input, AsyncFunction<? super I, ? extends O> function, Executor executor)610 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 611 AsyncFunction<? super I, ? extends O> function, 612 Executor executor) { 613 checkNotNull(executor); 614 ChainingListenableFuture<I, O> output = 615 new ChainingListenableFuture<I, O>(function, input); 616 input.addListener(rejectionPropagatingRunnable(output, output, executor), directExecutor()); 617 return output; 618 } 619 620 /** 621 * Returns a Runnable that will invoke the delegate Runnable on the delegate executor, but if the 622 * task is rejected, it will propagate that rejection to the output future. 623 */ rejectionPropagatingRunnable( final AbstractFuture<?> outputFuture, final Runnable delegateTask, final Executor delegateExecutor)624 private static Runnable rejectionPropagatingRunnable( 625 final AbstractFuture<?> outputFuture, 626 final Runnable delegateTask, 627 final Executor delegateExecutor) { 628 return new Runnable() { 629 @Override public void run() { 630 final AtomicBoolean thrownFromDelegate = new AtomicBoolean(true); 631 try { 632 delegateExecutor.execute(new Runnable() { 633 @Override public void run() { 634 thrownFromDelegate.set(false); 635 delegateTask.run(); 636 } 637 }); 638 } catch (RejectedExecutionException e) { 639 if (thrownFromDelegate.get()) { 640 // wrap exception? 641 outputFuture.setException(e); 642 } 643 // otherwise it must have been thrown from a transitive call and the delegate runnable 644 // should have handled it. 645 } 646 } 647 }; 648 } 649 650 /** 651 * Returns a new {@code ListenableFuture} whose result is the product of 652 * applying the given {@code Function} to the result of the given {@code 653 * Future}. Example: 654 * 655 * <pre> {@code 656 * ListenableFuture<QueryResult> queryFuture = ...; 657 * Function<QueryResult, List<Row>> rowsFunction = 658 * new Function<QueryResult, List<Row>>() { 659 * public List<Row> apply(QueryResult queryResult) { 660 * return queryResult.getRows(); 661 * } 662 * }; 663 * ListenableFuture<List<Row>> rowsFuture = 664 * transform(queryFuture, rowsFunction);}</pre> 665 * 666 * <p>Note: If the transformation is slow or heavyweight, consider {@linkplain 667 * #transform(ListenableFuture, Function, Executor) supplying an executor}. 668 * If you do not supply an executor, {@code transform} will use an inline 669 * executor, which carries some caveats for heavier operations. For example, 670 * the call to {@code function.apply} may run on an unpredictable or 671 * undesirable thread: 672 * 673 * <ul> 674 * <li>If the input {@code Future} is done at the time {@code transform} is 675 * called, {@code transform} will call {@code function.apply} inline. 676 * <li>If the input {@code Future} is not yet done, {@code transform} will 677 * schedule {@code function.apply} to be run by the thread that completes the 678 * input {@code Future}, which may be an internal system thread such as an 679 * RPC network thread. 680 * </ul> 681 * 682 * <p>Also note that, regardless of which thread executes the {@code 683 * function.apply}, all other registered but unexecuted listeners are 684 * prevented from running during its execution, even if those listeners are 685 * to run in other executors. 686 * 687 * <p>The returned {@code Future} attempts to keep its cancellation state in 688 * sync with that of the input future. That is, if the returned {@code Future} 689 * is cancelled, it will attempt to cancel the input, and if the input is 690 * cancelled, the returned {@code Future} will receive a callback in which it 691 * will attempt to cancel itself. 692 * 693 * <p>An example use of this method is to convert a serializable object 694 * returned from an RPC into a POJO. 695 * 696 * @param input The future to transform 697 * @param function A Function to transform the results of the provided future 698 * to the results of the returned future. This will be run in the thread 699 * that notifies input it is complete. 700 * @return A future that holds result of the transformation. 701 * @since 9.0 (in 1.0 as {@code compose}) 702 */ 703 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 704 final Function<? super I, ? extends O> function) { 705 checkNotNull(function); 706 ChainingListenableFuture<I, O> output = 707 new ChainingListenableFuture<I, O>(asAsyncFunction(function), input); 708 input.addListener(output, directExecutor()); 709 return output; 710 } 711 712 /** 713 * Returns a new {@code ListenableFuture} whose result is the product of 714 * applying the given {@code Function} to the result of the given {@code 715 * Future}. Example: 716 * 717 * <pre> {@code 718 * ListenableFuture<QueryResult> queryFuture = ...; 719 * Function<QueryResult, List<Row>> rowsFunction = 720 * new Function<QueryResult, List<Row>>() { 721 * public List<Row> apply(QueryResult queryResult) { 722 * return queryResult.getRows(); 723 * } 724 * }; 725 * ListenableFuture<List<Row>> rowsFuture = 726 * transform(queryFuture, rowsFunction, executor);}</pre> 727 * 728 * <p>The returned {@code Future} attempts to keep its cancellation state in 729 * sync with that of the input future. That is, if the returned {@code Future} 730 * is cancelled, it will attempt to cancel the input, and if the input is 731 * cancelled, the returned {@code Future} will receive a callback in which it 732 * will attempt to cancel itself. 733 * 734 * <p>An example use of this method is to convert a serializable object 735 * returned from an RPC into a POJO. 736 * 737 * <p>When the transformation is fast and lightweight, consider {@linkplain 738 * #transform(ListenableFuture, Function) omitting the executor} or 739 * explicitly specifying {@code directExecutor}. However, be aware of the 740 * caveats documented in the link above. 741 * 742 * @param input The future to transform 743 * @param function A Function to transform the results of the provided future 744 * to the results of the returned future. 745 * @param executor Executor to run the function in. 746 * @return A future that holds result of the transformation. 747 * @since 9.0 (in 2.0 as {@code compose}) 748 */ 749 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 750 final Function<? super I, ? extends O> function, Executor executor) { 751 checkNotNull(function); 752 return transform(input, asAsyncFunction(function), executor); 753 } 754 755 /** Wraps the given function as an AsyncFunction. */ 756 private static <I, O> AsyncFunction<I, O> asAsyncFunction( 757 final Function<? super I, ? extends O> function) { 758 return new AsyncFunction<I, O>() { 759 @Override public ListenableFuture<O> apply(I input) { 760 O output = function.apply(input); 761 return immediateFuture(output); 762 } 763 }; 764 } 765 766 /** 767 * Like {@link #transform(ListenableFuture, Function)} except that the 768 * transformation {@code function} is invoked on each call to 769 * {@link Future#get() get()} on the returned future. 770 * 771 * <p>The returned {@code Future} reflects the input's cancellation 772 * state directly, and any attempt to cancel the returned Future is likewise 773 * passed through to the input Future. 774 * 775 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} 776 * only apply the timeout to the execution of the underlying {@code Future}, 777 * <em>not</em> to the execution of the transformation function. 778 * 779 * <p>The primary audience of this method is callers of {@code transform} 780 * who don't have a {@code ListenableFuture} available and 781 * do not mind repeated, lazy function evaluation. 782 * 783 * @param input The future to transform 784 * @param function A Function to transform the results of the provided future 785 * to the results of the returned future. 786 * @return A future that returns the result of the transformation. 787 * @since 10.0 788 */ 789 public static <I, O> Future<O> lazyTransform(final Future<I> input, 790 final Function<? super I, ? extends O> function) { 791 checkNotNull(input); 792 checkNotNull(function); 793 return new Future<O>() { 794 795 @Override 796 public boolean cancel(boolean mayInterruptIfRunning) { 797 return input.cancel(mayInterruptIfRunning); 798 } 799 800 @Override 801 public boolean isCancelled() { 802 return input.isCancelled(); 803 } 804 805 @Override 806 public boolean isDone() { 807 return input.isDone(); 808 } 809 810 @Override 811 public O get() throws InterruptedException, ExecutionException { 812 return applyTransformation(input.get()); 813 } 814 815 @Override 816 public O get(long timeout, TimeUnit unit) 817 throws InterruptedException, ExecutionException, TimeoutException { 818 return applyTransformation(input.get(timeout, unit)); 819 } 820 821 private O applyTransformation(I input) throws ExecutionException { 822 try { 823 return function.apply(input); 824 } catch (Throwable t) { 825 throw new ExecutionException(t); 826 } 827 } 828 }; 829 } 830 831 /** 832 * An implementation of {@code ListenableFuture} that also implements 833 * {@code Runnable} so that it can be used to nest ListenableFutures. 834 * Once the passed-in {@code ListenableFuture} is complete, it calls the 835 * passed-in {@code Function} to generate the result. 836 * 837 * <p>For historical reasons, this class has a special case in its exception 838 * handling: If the given {@code AsyncFunction} throws an {@code 839 * UndeclaredThrowableException}, {@code ChainingListenableFuture} unwraps it 840 * and uses its <i>cause</i> as the output future's exception, rather than 841 * using the {@code UndeclaredThrowableException} itself as it would for other 842 * exception types. The reason for this is that {@code Futures.transform} used 843 * to require a {@code Function}, whose {@code apply} method is not allowed to 844 * throw checked exceptions. Nowadays, {@code Futures.transform} has an 845 * overload that accepts an {@code AsyncFunction}, whose {@code apply} method 846 * <i>is</i> allowed to throw checked exception. Users who wish to throw 847 * checked exceptions should use that overload instead, and <a 848 * href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we 849 * should remove the {@code UndeclaredThrowableException} special case</a>. 850 */ 851 private static class ChainingListenableFuture<I, O> 852 extends AbstractFuture<O> implements Runnable { 853 854 private AsyncFunction<? super I, ? extends O> function; 855 private ListenableFuture<? extends I> inputFuture; 856 private volatile ListenableFuture<? extends O> outputFuture; 857 858 private ChainingListenableFuture( 859 AsyncFunction<? super I, ? extends O> function, 860 ListenableFuture<? extends I> inputFuture) { 861 this.function = checkNotNull(function); 862 this.inputFuture = checkNotNull(inputFuture); 863 } 864 865 @Override 866 public boolean cancel(boolean mayInterruptIfRunning) { 867 /* 868 * Our additional cancellation work needs to occur even if 869 * !mayInterruptIfRunning, so we can't move it into interruptTask(). 870 */ 871 if (super.cancel(mayInterruptIfRunning)) { 872 // This should never block since only one thread is allowed to cancel 873 // this Future. 874 cancel(inputFuture, mayInterruptIfRunning); 875 cancel(outputFuture, mayInterruptIfRunning); 876 return true; 877 } 878 return false; 879 } 880 881 private void cancel(@Nullable Future<?> future, 882 boolean mayInterruptIfRunning) { 883 if (future != null) { 884 future.cancel(mayInterruptIfRunning); 885 } 886 } 887 888 @Override 889 public void run() { 890 try { 891 I sourceResult; 892 try { 893 sourceResult = getUninterruptibly(inputFuture); 894 } catch (CancellationException e) { 895 // Cancel this future and return. 896 // At this point, inputFuture is cancelled and outputFuture doesn't 897 // exist, so the value of mayInterruptIfRunning is irrelevant. 898 cancel(false); 899 return; 900 } catch (ExecutionException e) { 901 // Set the cause of the exception as this future's exception 902 setException(e.getCause()); 903 return; 904 } 905 906 final ListenableFuture<? extends O> outputFuture = this.outputFuture = 907 Preconditions.checkNotNull(function.apply(sourceResult), 908 "AsyncFunction may not return null."); 909 if (isCancelled()) { 910 outputFuture.cancel(wasInterrupted()); 911 this.outputFuture = null; 912 return; 913 } 914 outputFuture.addListener(new Runnable() { 915 @Override 916 public void run() { 917 try { 918 set(getUninterruptibly(outputFuture)); 919 } catch (CancellationException e) { 920 // Cancel this future and return. 921 // At this point, inputFuture and outputFuture are done, so the 922 // value of mayInterruptIfRunning is irrelevant. 923 cancel(false); 924 return; 925 } catch (ExecutionException e) { 926 // Set the cause of the exception as this future's exception 927 setException(e.getCause()); 928 } finally { 929 // Don't pin inputs beyond completion 930 ChainingListenableFuture.this.outputFuture = null; 931 } 932 } 933 }, directExecutor()); 934 } catch (UndeclaredThrowableException e) { 935 // Set the cause of the exception as this future's exception 936 setException(e.getCause()); 937 } catch (Throwable t) { 938 // This exception is irrelevant in this thread, but useful for the 939 // client 940 setException(t); 941 } finally { 942 // Don't pin inputs beyond completion 943 function = null; 944 inputFuture = null; 945 } 946 } 947 } 948 949 /** 950 * Returns a new {@code ListenableFuture} whose result is the product of 951 * calling {@code get()} on the {@code Future} nested within the given {@code 952 * Future}, effectively chaining the futures one after the other. Example: 953 * 954 * <pre> {@code 955 * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create(); 956 * ListenableFuture<String> dereferenced = dereference(nested);}</pre> 957 * 958 * <p>This call has the same cancellation and execution semantics as {@link 959 * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code 960 * Future} attempts to keep its cancellation state in sync with both the 961 * input {@code Future} and the nested {@code Future}. The transformation 962 * is very lightweight and therefore takes place in the same thread (either 963 * the thread that called {@code dereference}, or the thread in which the 964 * dereferenced future completes). 965 * 966 * @param nested The nested future to transform. 967 * @return A future that holds result of the inner future. 968 * @since 13.0 969 */ 970 @SuppressWarnings({"rawtypes", "unchecked"}) 971 public static <V> ListenableFuture<V> dereference( 972 ListenableFuture<? extends ListenableFuture<? extends V>> nested) { 973 return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER); 974 } 975 976 /** 977 * Helper {@code Function} for {@link #dereference}. 978 */ 979 private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER = 980 new AsyncFunction<ListenableFuture<Object>, Object>() { 981 @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) { 982 return input; 983 } 984 }; 985 986 /** 987 * Creates a new {@code ListenableFuture} whose value is a list containing the 988 * values of all its input futures, if all succeed. If any input fails, the 989 * returned future fails immediately. 990 * 991 * <p>The list of results is in the same order as the input list. 992 * 993 * <p>Canceling this future will attempt to cancel all the component futures, 994 * and if any of the provided futures fails or is canceled, this one is, 995 * too. 996 * 997 * @param futures futures to combine 998 * @return a future that provides a list of the results of the component 999 * futures 1000 * @since 10.0 1001 */ 1002 @Beta 1003 public static <V> ListenableFuture<List<V>> allAsList( 1004 ListenableFuture<? extends V>... futures) { 1005 return listFuture(ImmutableList.copyOf(futures), true, directExecutor()); 1006 } 1007 1008 /** 1009 * Creates a new {@code ListenableFuture} whose value is a list containing the 1010 * values of all its input futures, if all succeed. If any input fails, the 1011 * returned future fails immediately. 1012 * 1013 * <p>The list of results is in the same order as the input list. 1014 * 1015 * <p>Canceling this future will attempt to cancel all the component futures, 1016 * and if any of the provided futures fails or is canceled, this one is, 1017 * too. 1018 * 1019 * @param futures futures to combine 1020 * @return a future that provides a list of the results of the component 1021 * futures 1022 * @since 10.0 1023 */ 1024 @Beta 1025 public static <V> ListenableFuture<List<V>> allAsList( 1026 Iterable<? extends ListenableFuture<? extends V>> futures) { 1027 return listFuture(ImmutableList.copyOf(futures), true, directExecutor()); 1028 } 1029 1030 private static final class WrappedCombiner<T> implements Callable<T> { 1031 final Callable<T> delegate; 1032 CombinerFuture<T> outputFuture; 1033 1034 WrappedCombiner(Callable<T> delegate) { 1035 this.delegate = checkNotNull(delegate); 1036 } 1037 1038 @Override public T call() throws Exception { 1039 try { 1040 return delegate.call(); 1041 } catch (ExecutionException e) { 1042 outputFuture.setException(e.getCause()); 1043 } catch (CancellationException e) { 1044 outputFuture.cancel(false); 1045 } 1046 // at this point the return value doesn't matter since we already called setException or 1047 // cancel so the future is done. 1048 return null; 1049 } 1050 } 1051 1052 private static final class CombinerFuture<V> extends ListenableFutureTask<V> { 1053 ImmutableList<ListenableFuture<?>> futures; 1054 1055 CombinerFuture(Callable<V> callable, ImmutableList<ListenableFuture<?>> futures) { 1056 super(callable); 1057 this.futures = futures; 1058 } 1059 1060 @Override public boolean cancel(boolean mayInterruptIfRunning) { 1061 ImmutableList<ListenableFuture<?>> futures = this.futures; 1062 if (super.cancel(mayInterruptIfRunning)) { 1063 for (ListenableFuture<?> future : futures) { 1064 future.cancel(mayInterruptIfRunning); 1065 } 1066 return true; 1067 } 1068 return false; 1069 } 1070 1071 @Override protected void done() { 1072 super.done(); 1073 futures = null; 1074 } 1075 1076 @Override protected void setException(Throwable t) { 1077 super.setException(t); 1078 } 1079 } 1080 1081 /** 1082 * Creates a new {@code ListenableFuture} whose result is set from the 1083 * supplied future when it completes. Cancelling the supplied future 1084 * will also cancel the returned future, but cancelling the returned 1085 * future will have no effect on the supplied future. 1086 * 1087 * @since 15.0 1088 */ 1089 public static <V> ListenableFuture<V> nonCancellationPropagating( 1090 ListenableFuture<V> future) { 1091 return new NonCancellationPropagatingFuture<V>(future); 1092 } 1093 1094 /** 1095 * A wrapped future that does not propagate cancellation to its delegate. 1096 */ 1097 private static class NonCancellationPropagatingFuture<V> 1098 extends AbstractFuture<V> { 1099 NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) { 1100 checkNotNull(delegate); 1101 addCallback(delegate, new FutureCallback<V>() { 1102 @Override 1103 public void onSuccess(V result) { 1104 set(result); 1105 } 1106 1107 @Override 1108 public void onFailure(Throwable t) { 1109 if (delegate.isCancelled()) { 1110 cancel(false); 1111 } else { 1112 setException(t); 1113 } 1114 } 1115 }, directExecutor()); 1116 } 1117 } 1118 1119 /** 1120 * Creates a new {@code ListenableFuture} whose value is a list containing the 1121 * values of all its successful input futures. The list of results is in the 1122 * same order as the input list, and if any of the provided futures fails or 1123 * is canceled, its corresponding position will contain {@code null} (which is 1124 * indistinguishable from the future having a successful value of 1125 * {@code null}). 1126 * 1127 * <p>Canceling this future will attempt to cancel all the component futures. 1128 * 1129 * @param futures futures to combine 1130 * @return a future that provides a list of the results of the component 1131 * futures 1132 * @since 10.0 1133 */ 1134 @Beta 1135 public static <V> ListenableFuture<List<V>> successfulAsList( 1136 ListenableFuture<? extends V>... futures) { 1137 return listFuture(ImmutableList.copyOf(futures), false, directExecutor()); 1138 } 1139 1140 /** 1141 * Creates a new {@code ListenableFuture} whose value is a list containing the 1142 * values of all its successful input futures. The list of results is in the 1143 * same order as the input list, and if any of the provided futures fails or 1144 * is canceled, its corresponding position will contain {@code null} (which is 1145 * indistinguishable from the future having a successful value of 1146 * {@code null}). 1147 * 1148 * <p>Canceling this future will attempt to cancel all the component futures. 1149 * 1150 * @param futures futures to combine 1151 * @return a future that provides a list of the results of the component 1152 * futures 1153 * @since 10.0 1154 */ 1155 @Beta 1156 public static <V> ListenableFuture<List<V>> successfulAsList( 1157 Iterable<? extends ListenableFuture<? extends V>> futures) { 1158 return listFuture(ImmutableList.copyOf(futures), false, directExecutor()); 1159 } 1160 1161 /** 1162 * Returns a list of delegate futures that correspond to the futures received in the order 1163 * that they complete. Delegate futures return the same value or throw the same exception 1164 * as the corresponding input future returns/throws. 1165 * 1166 * <p>Cancelling a delegate future has no effect on any input future, since the delegate future 1167 * does not correspond to a specific input future until the appropriate number of input 1168 * futures have completed. At that point, it is too late to cancel the input future. 1169 * The input future's result, which cannot be stored into the cancelled delegate future, 1170 * is ignored. 1171 * 1172 * @since 17.0 1173 */ 1174 @Beta 1175 public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder( 1176 Iterable<? extends ListenableFuture<? extends T>> futures) { 1177 // A CLQ may be overkill here. We could save some pointers/memory by synchronizing on an 1178 // ArrayDeque 1179 final ConcurrentLinkedQueue<AsyncSettableFuture<T>> delegates = 1180 Queues.newConcurrentLinkedQueue(); 1181 ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder(); 1182 // Using SerializingExecutor here will ensure that each CompletionOrderListener executes 1183 // atomically and therefore that each returned future is guaranteed to be in completion order. 1184 // N.B. there are some cases where the use of this executor could have possibly surprising 1185 // effects when input futures finish at approximately the same time _and_ the output futures 1186 // have directExecutor listeners. In this situation, the listeners may end up running on a 1187 // different thread than if they were attached to the corresponding input future. We believe 1188 // this to be a negligible cost since: 1189 // 1. Using the directExecutor implies that your callback is safe to run on any thread. 1190 // 2. This would likely only be noticeable if you were doing something expensive or blocking on 1191 // a directExecutor listener on one of the output futures which is an antipattern anyway. 1192 SerializingExecutor executor = new SerializingExecutor(directExecutor()); 1193 for (final ListenableFuture<? extends T> future : futures) { 1194 AsyncSettableFuture<T> delegate = AsyncSettableFuture.create(); 1195 // Must make sure to add the delegate to the queue first in case the future is already done 1196 delegates.add(delegate); 1197 future.addListener(new Runnable() { 1198 @Override public void run() { 1199 delegates.remove().setFuture(future); 1200 } 1201 }, executor); 1202 listBuilder.add(delegate); 1203 } 1204 return listBuilder.build(); 1205 } 1206 1207 /** 1208 * Registers separate success and failure callbacks to be run when the {@code 1209 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1210 * complete} or, if the computation is already complete, immediately. 1211 * 1212 * <p>There is no guaranteed ordering of execution of callbacks, but any 1213 * callback added through this method is guaranteed to be called once the 1214 * computation is complete. 1215 * 1216 * Example: <pre> {@code 1217 * ListenableFuture<QueryResult> future = ...; 1218 * addCallback(future, 1219 * new FutureCallback<QueryResult> { 1220 * public void onSuccess(QueryResult result) { 1221 * storeInCache(result); 1222 * } 1223 * public void onFailure(Throwable t) { 1224 * reportError(t); 1225 * } 1226 * });}</pre> 1227 * 1228 * <p>Note: If the callback is slow or heavyweight, consider {@linkplain 1229 * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an 1230 * executor}. If you do not supply an executor, {@code addCallback} will use 1231 * a {@linkplain MoreExecutors#directExecutor direct executor}, which carries 1232 * some caveats for heavier operations. For example, the callback may run on 1233 * an unpredictable or undesirable thread: 1234 * 1235 * <ul> 1236 * <li>If the input {@code Future} is done at the time {@code addCallback} is 1237 * called, {@code addCallback} will execute the callback inline. 1238 * <li>If the input {@code Future} is not yet done, {@code addCallback} will 1239 * schedule the callback to be run by the thread that completes the input 1240 * {@code Future}, which may be an internal system thread such as an RPC 1241 * network thread. 1242 * </ul> 1243 * 1244 * <p>Also note that, regardless of which thread executes the callback, all 1245 * other registered but unexecuted listeners are prevented from running 1246 * during its execution, even if those listeners are to run in other 1247 * executors. 1248 * 1249 * <p>For a more general interface to attach a completion listener to a 1250 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1251 * 1252 * @param future The future attach the callback to. 1253 * @param callback The callback to invoke when {@code future} is completed. 1254 * @since 10.0 1255 */ 1256 public static <V> void addCallback(ListenableFuture<V> future, 1257 FutureCallback<? super V> callback) { 1258 addCallback(future, callback, directExecutor()); 1259 } 1260 1261 /** 1262 * Registers separate success and failure callbacks to be run when the {@code 1263 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1264 * complete} or, if the computation is already complete, immediately. 1265 * 1266 * <p>The callback is run in {@code executor}. 1267 * There is no guaranteed ordering of execution of callbacks, but any 1268 * callback added through this method is guaranteed to be called once the 1269 * computation is complete. 1270 * 1271 * Example: <pre> {@code 1272 * ListenableFuture<QueryResult> future = ...; 1273 * Executor e = ... 1274 * addCallback(future, 1275 * new FutureCallback<QueryResult> { 1276 * public void onSuccess(QueryResult result) { 1277 * storeInCache(result); 1278 * } 1279 * public void onFailure(Throwable t) { 1280 * reportError(t); 1281 * } 1282 * }, e);}</pre> 1283 * 1284 * <p>When the callback is fast and lightweight, consider {@linkplain 1285 * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or 1286 * explicitly specifying {@code directExecutor}. However, be aware of the 1287 * caveats documented in the link above. 1288 * 1289 * <p>For a more general interface to attach a completion listener to a 1290 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1291 * 1292 * @param future The future attach the callback to. 1293 * @param callback The callback to invoke when {@code future} is completed. 1294 * @param executor The executor to run {@code callback} when the future 1295 * completes. 1296 * @since 10.0 1297 */ 1298 public static <V> void addCallback(final ListenableFuture<V> future, 1299 final FutureCallback<? super V> callback, Executor executor) { 1300 Preconditions.checkNotNull(callback); 1301 Runnable callbackListener = new Runnable() { 1302 @Override 1303 public void run() { 1304 final V value; 1305 try { 1306 // TODO(user): (Before Guava release), validate that this 1307 // is the thing for IE. 1308 value = getUninterruptibly(future); 1309 } catch (ExecutionException e) { 1310 callback.onFailure(e.getCause()); 1311 return; 1312 } catch (RuntimeException e) { 1313 callback.onFailure(e); 1314 return; 1315 } catch (Error e) { 1316 callback.onFailure(e); 1317 return; 1318 } 1319 callback.onSuccess(value); 1320 } 1321 }; 1322 future.addListener(callbackListener, executor); 1323 } 1324 1325 /** 1326 * Returns the result of {@link Future#get()}, converting most exceptions to a 1327 * new instance of the given checked exception type. This reduces boilerplate 1328 * for a common use of {@code Future} in which it is unnecessary to 1329 * programmatically distinguish between exception types or to extract other 1330 * information from the exception instance. 1331 * 1332 * <p>Exceptions from {@code Future.get} are treated as follows: 1333 * <ul> 1334 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1335 * {@code X} if the cause is a checked exception, an {@link 1336 * UncheckedExecutionException} if the cause is a {@code 1337 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1338 * {@code Error}. 1339 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1340 * restoring the interrupt). 1341 * <li>Any {@link CancellationException} is propagated untouched, as is any 1342 * other {@link RuntimeException} (though {@code get} implementations are 1343 * discouraged from throwing such exceptions). 1344 * </ul> 1345 * 1346 * <p>The overall principle is to continue to treat every checked exception as a 1347 * checked exception, every unchecked exception as an unchecked exception, and 1348 * every error as an error. In addition, the cause of any {@code 1349 * ExecutionException} is wrapped in order to ensure that the new stack trace 1350 * matches that of the current thread. 1351 * 1352 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1353 * public constructor that accepts zero or more arguments, all of type {@code 1354 * String} or {@code Throwable} (preferring constructors with at least one 1355 * {@code String}) and calling the constructor via reflection. If the 1356 * exception did not already have a cause, one is set by calling {@link 1357 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1358 * {@code IllegalArgumentException} is thrown. 1359 * 1360 * @throws X if {@code get} throws any checked exception except for an {@code 1361 * ExecutionException} whose cause is not itself a checked exception 1362 * @throws UncheckedExecutionException if {@code get} throws an {@code 1363 * ExecutionException} with a {@code RuntimeException} as its cause 1364 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1365 * with an {@code Error} as its cause 1366 * @throws CancellationException if {@code get} throws a {@code 1367 * CancellationException} 1368 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1369 * RuntimeException} or does not have a suitable constructor 1370 * @since 10.0 1371 */ 1372 public static <V, X extends Exception> V get( 1373 Future<V> future, Class<X> exceptionClass) throws X { 1374 checkNotNull(future); 1375 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1376 "Futures.get exception type (%s) must not be a RuntimeException", 1377 exceptionClass); 1378 try { 1379 return future.get(); 1380 } catch (InterruptedException e) { 1381 currentThread().interrupt(); 1382 throw newWithCause(exceptionClass, e); 1383 } catch (ExecutionException e) { 1384 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1385 throw new AssertionError(); 1386 } 1387 } 1388 1389 /** 1390 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most 1391 * exceptions to a new instance of the given checked exception type. This 1392 * reduces boilerplate for a common use of {@code Future} in which it is 1393 * unnecessary to programmatically distinguish between exception types or to 1394 * extract other information from the exception instance. 1395 * 1396 * <p>Exceptions from {@code Future.get} are treated as follows: 1397 * <ul> 1398 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1399 * {@code X} if the cause is a checked exception, an {@link 1400 * UncheckedExecutionException} if the cause is a {@code 1401 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1402 * {@code Error}. 1403 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1404 * restoring the interrupt). 1405 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1406 * <li>Any {@link CancellationException} is propagated untouched, as is any 1407 * other {@link RuntimeException} (though {@code get} implementations are 1408 * discouraged from throwing such exceptions). 1409 * </ul> 1410 * 1411 * <p>The overall principle is to continue to treat every checked exception as a 1412 * checked exception, every unchecked exception as an unchecked exception, and 1413 * every error as an error. In addition, the cause of any {@code 1414 * ExecutionException} is wrapped in order to ensure that the new stack trace 1415 * matches that of the current thread. 1416 * 1417 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1418 * public constructor that accepts zero or more arguments, all of type {@code 1419 * String} or {@code Throwable} (preferring constructors with at least one 1420 * {@code String}) and calling the constructor via reflection. If the 1421 * exception did not already have a cause, one is set by calling {@link 1422 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1423 * {@code IllegalArgumentException} is thrown. 1424 * 1425 * @throws X if {@code get} throws any checked exception except for an {@code 1426 * ExecutionException} whose cause is not itself a checked exception 1427 * @throws UncheckedExecutionException if {@code get} throws an {@code 1428 * ExecutionException} with a {@code RuntimeException} as its cause 1429 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1430 * with an {@code Error} as its cause 1431 * @throws CancellationException if {@code get} throws a {@code 1432 * CancellationException} 1433 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1434 * RuntimeException} or does not have a suitable constructor 1435 * @since 10.0 1436 */ 1437 public static <V, X extends Exception> V get( 1438 Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass) 1439 throws X { 1440 checkNotNull(future); 1441 checkNotNull(unit); 1442 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1443 "Futures.get exception type (%s) must not be a RuntimeException", 1444 exceptionClass); 1445 try { 1446 return future.get(timeout, unit); 1447 } catch (InterruptedException e) { 1448 currentThread().interrupt(); 1449 throw newWithCause(exceptionClass, e); 1450 } catch (TimeoutException e) { 1451 throw newWithCause(exceptionClass, e); 1452 } catch (ExecutionException e) { 1453 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1454 throw new AssertionError(); 1455 } 1456 } 1457 1458 private static <X extends Exception> void wrapAndThrowExceptionOrError( 1459 Throwable cause, Class<X> exceptionClass) throws X { 1460 if (cause instanceof Error) { 1461 throw new ExecutionError((Error) cause); 1462 } 1463 if (cause instanceof RuntimeException) { 1464 throw new UncheckedExecutionException(cause); 1465 } 1466 throw newWithCause(exceptionClass, cause); 1467 } 1468 1469 /** 1470 * Returns the result of calling {@link Future#get()} uninterruptibly on a 1471 * task known not to throw a checked exception. This makes {@code Future} more 1472 * suitable for lightweight, fast-running tasks that, barring bugs in the 1473 * code, will not fail. This gives it exception-handling behavior similar to 1474 * that of {@code ForkJoinTask.join}. 1475 * 1476 * <p>Exceptions from {@code Future.get} are treated as follows: 1477 * <ul> 1478 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1479 * {@link UncheckedExecutionException} (if the cause is an {@code 1480 * Exception}) or {@link ExecutionError} (if the cause is an {@code 1481 * Error}). 1482 * <li>Any {@link InterruptedException} causes a retry of the {@code get} 1483 * call. The interrupt is restored before {@code getUnchecked} returns. 1484 * <li>Any {@link CancellationException} is propagated untouched. So is any 1485 * other {@link RuntimeException} ({@code get} implementations are 1486 * discouraged from throwing such exceptions). 1487 * </ul> 1488 * 1489 * <p>The overall principle is to eliminate all checked exceptions: to loop to 1490 * avoid {@code InterruptedException}, to pass through {@code 1491 * CancellationException}, and to wrap any exception from the underlying 1492 * computation in an {@code UncheckedExecutionException} or {@code 1493 * ExecutionError}. 1494 * 1495 * <p>For an uninterruptible {@code get} that preserves other exceptions, see 1496 * {@link Uninterruptibles#getUninterruptibly(Future)}. 1497 * 1498 * @throws UncheckedExecutionException if {@code get} throws an {@code 1499 * ExecutionException} with an {@code Exception} as its cause 1500 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1501 * with an {@code Error} as its cause 1502 * @throws CancellationException if {@code get} throws a {@code 1503 * CancellationException} 1504 * @since 10.0 1505 */ 1506 public static <V> V getUnchecked(Future<V> future) { 1507 checkNotNull(future); 1508 try { 1509 return getUninterruptibly(future); 1510 } catch (ExecutionException e) { 1511 wrapAndThrowUnchecked(e.getCause()); 1512 throw new AssertionError(); 1513 } 1514 } 1515 1516 private static void wrapAndThrowUnchecked(Throwable cause) { 1517 if (cause instanceof Error) { 1518 throw new ExecutionError((Error) cause); 1519 } 1520 /* 1521 * It's a non-Error, non-Exception Throwable. From my survey of such 1522 * classes, I believe that most users intended to extend Exception, so we'll 1523 * treat it like an Exception. 1524 */ 1525 throw new UncheckedExecutionException(cause); 1526 } 1527 1528 /* 1529 * TODO(user): FutureChecker interface for these to be static methods on? If 1530 * so, refer to it in the (static-method) Futures.get documentation 1531 */ 1532 1533 /* 1534 * Arguably we don't need a timed getUnchecked because any operation slow 1535 * enough to require a timeout is heavyweight enough to throw a checked 1536 * exception and therefore be inappropriate to use with getUnchecked. Further, 1537 * it's not clear that converting the checked TimeoutException to a 1538 * RuntimeException -- especially to an UncheckedExecutionException, since it 1539 * wasn't thrown by the computation -- makes sense, and if we don't convert 1540 * it, the user still has to write a try-catch block. 1541 * 1542 * If you think you would use this method, let us know. 1543 */ 1544 1545 private static <X extends Exception> X newWithCause( 1546 Class<X> exceptionClass, Throwable cause) { 1547 // getConstructors() guarantees this as long as we don't modify the array. 1548 @SuppressWarnings("unchecked") 1549 List<Constructor<X>> constructors = 1550 (List) Arrays.asList(exceptionClass.getConstructors()); 1551 for (Constructor<X> constructor : preferringStrings(constructors)) { 1552 @Nullable X instance = newFromConstructor(constructor, cause); 1553 if (instance != null) { 1554 if (instance.getCause() == null) { 1555 instance.initCause(cause); 1556 } 1557 return instance; 1558 } 1559 } 1560 throw new IllegalArgumentException( 1561 "No appropriate constructor for exception of type " + exceptionClass 1562 + " in response to chained exception", cause); 1563 } 1564 1565 private static <X extends Exception> List<Constructor<X>> 1566 preferringStrings(List<Constructor<X>> constructors) { 1567 return WITH_STRING_PARAM_FIRST.sortedCopy(constructors); 1568 } 1569 1570 private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST = 1571 Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() { 1572 @Override public Boolean apply(Constructor<?> input) { 1573 return asList(input.getParameterTypes()).contains(String.class); 1574 } 1575 }).reverse(); 1576 1577 @Nullable private static <X> X newFromConstructor( 1578 Constructor<X> constructor, Throwable cause) { 1579 Class<?>[] paramTypes = constructor.getParameterTypes(); 1580 Object[] params = new Object[paramTypes.length]; 1581 for (int i = 0; i < paramTypes.length; i++) { 1582 Class<?> paramType = paramTypes[i]; 1583 if (paramType.equals(String.class)) { 1584 params[i] = cause.toString(); 1585 } else if (paramType.equals(Throwable.class)) { 1586 params[i] = cause; 1587 } else { 1588 return null; 1589 } 1590 } 1591 try { 1592 return constructor.newInstance(params); 1593 } catch (IllegalArgumentException e) { 1594 return null; 1595 } catch (InstantiationException e) { 1596 return null; 1597 } catch (IllegalAccessException e) { 1598 return null; 1599 } catch (InvocationTargetException e) { 1600 return null; 1601 } 1602 } 1603 1604 private interface FutureCombiner<V, C> { 1605 C combine(List<Optional<V>> values); 1606 } 1607 1608 private static class CombinedFuture<V, C> extends AbstractFuture<C> { 1609 private static final Logger logger = 1610 Logger.getLogger(CombinedFuture.class.getName()); 1611 1612 ImmutableCollection<? extends ListenableFuture<? extends V>> futures; 1613 final boolean allMustSucceed; 1614 final AtomicInteger remaining; 1615 FutureCombiner<V, C> combiner; 1616 List<Optional<V>> values; 1617 final Object seenExceptionsLock = new Object(); 1618 Set<Throwable> seenExceptions; 1619 1620 CombinedFuture( 1621 ImmutableCollection<? extends ListenableFuture<? extends V>> futures, 1622 boolean allMustSucceed, Executor listenerExecutor, 1623 FutureCombiner<V, C> combiner) { 1624 this.futures = futures; 1625 this.allMustSucceed = allMustSucceed; 1626 this.remaining = new AtomicInteger(futures.size()); 1627 this.combiner = combiner; 1628 this.values = Lists.newArrayListWithCapacity(futures.size()); 1629 init(listenerExecutor); 1630 } 1631 1632 /** 1633 * Must be called at the end of the constructor. 1634 */ 1635 protected void init(final Executor listenerExecutor) { 1636 // First, schedule cleanup to execute when the Future is done. 1637 addListener(new Runnable() { 1638 @Override 1639 public void run() { 1640 // Cancel all the component futures. 1641 if (CombinedFuture.this.isCancelled()) { 1642 for (ListenableFuture<?> future : CombinedFuture.this.futures) { 1643 future.cancel(CombinedFuture.this.wasInterrupted()); 1644 } 1645 } 1646 1647 // Let go of the memory held by other futures 1648 CombinedFuture.this.futures = null; 1649 1650 // By now the values array has either been set as the Future's value, 1651 // or (in case of failure) is no longer useful. 1652 CombinedFuture.this.values = null; 1653 1654 // The combiner may also hold state, so free that as well 1655 CombinedFuture.this.combiner = null; 1656 } 1657 }, directExecutor()); 1658 1659 // Now begin the "real" initialization. 1660 1661 // Corner case: List is empty. 1662 if (futures.isEmpty()) { 1663 set(combiner.combine(ImmutableList.<Optional<V>>of())); 1664 return; 1665 } 1666 1667 // Populate the results list with null initially. 1668 for (int i = 0; i < futures.size(); ++i) { 1669 values.add(null); 1670 } 1671 1672 // Register a listener on each Future in the list to update 1673 // the state of this future. 1674 // Note that if all the futures on the list are done prior to completing 1675 // this loop, the last call to addListener() will callback to 1676 // setOneValue(), transitively call our cleanup listener, and set 1677 // this.futures to null. 1678 // This is not actually a problem, since the foreach only needs 1679 // this.futures to be non-null at the beginning of the loop. 1680 int i = 0; 1681 for (final ListenableFuture<? extends V> listenable : futures) { 1682 final int index = i++; 1683 listenable.addListener(new Runnable() { 1684 @Override 1685 public void run() { 1686 setOneValue(index, listenable); 1687 } 1688 }, listenerExecutor); 1689 } 1690 } 1691 1692 /** 1693 * Fails this future with the given Throwable if {@link #allMustSucceed} is 1694 * true. Also, logs the throwable if it is an {@link Error} or if 1695 * {@link #allMustSucceed} is {@code true}, the throwable did not cause 1696 * this future to fail, and it is the first time we've seen that particular Throwable. 1697 */ 1698 private void setExceptionAndMaybeLog(Throwable throwable) { 1699 boolean visibleFromOutputFuture = false; 1700 boolean firstTimeSeeingThisException = true; 1701 if (allMustSucceed) { 1702 // As soon as the first one fails, throw the exception up. 1703 // The result of all other inputs is then ignored. 1704 visibleFromOutputFuture = super.setException(throwable); 1705 1706 synchronized (seenExceptionsLock) { 1707 if (seenExceptions == null) { 1708 seenExceptions = Sets.newHashSet(); 1709 } 1710 firstTimeSeeingThisException = seenExceptions.add(throwable); 1711 } 1712 } 1713 1714 if (throwable instanceof Error 1715 || (allMustSucceed && !visibleFromOutputFuture && firstTimeSeeingThisException)) { 1716 logger.log(Level.SEVERE, "input future failed.", throwable); 1717 } 1718 } 1719 1720 /** 1721 * Sets the value at the given index to that of the given future. 1722 */ 1723 private void setOneValue(int index, Future<? extends V> future) { 1724 List<Optional<V>> localValues = values; 1725 // TODO(user): This check appears to be redundant since values is 1726 // assigned null only after the future completes. However, values 1727 // is not volatile so it may be possible for us to observe the changes 1728 // to these two values in a different order... which I think is why 1729 // we need to check both. Clear up this craziness either by making 1730 // values volatile or proving that it doesn't need to be for some other 1731 // reason. 1732 if (isDone() || localValues == null) { 1733 // Some other future failed or has been cancelled, causing this one to 1734 // also be cancelled or have an exception set. This should only happen 1735 // if allMustSucceed is true or if the output itself has been 1736 // cancelled. 1737 checkState(allMustSucceed || isCancelled(), 1738 "Future was done before all dependencies completed"); 1739 } 1740 1741 try { 1742 checkState(future.isDone(), 1743 "Tried to set value from future which is not done"); 1744 V returnValue = getUninterruptibly(future); 1745 if (localValues != null) { 1746 localValues.set(index, Optional.fromNullable(returnValue)); 1747 } 1748 } catch (CancellationException e) { 1749 if (allMustSucceed) { 1750 // Set ourselves as cancelled. Let the input futures keep running 1751 // as some of them may be used elsewhere. 1752 cancel(false); 1753 } 1754 } catch (ExecutionException e) { 1755 setExceptionAndMaybeLog(e.getCause()); 1756 } catch (Throwable t) { 1757 setExceptionAndMaybeLog(t); 1758 } finally { 1759 int newRemaining = remaining.decrementAndGet(); 1760 checkState(newRemaining >= 0, "Less than 0 remaining futures"); 1761 if (newRemaining == 0) { 1762 FutureCombiner<V, C> localCombiner = combiner; 1763 if (localCombiner != null && localValues != null) { 1764 set(localCombiner.combine(localValues)); 1765 } else { 1766 checkState(isDone()); 1767 } 1768 } 1769 } 1770 } 1771 } 1772 1773 /** Used for {@link #allAsList} and {@link #successfulAsList}. */ 1774 private static <V> ListenableFuture<List<V>> listFuture( 1775 ImmutableList<ListenableFuture<? extends V>> futures, 1776 boolean allMustSucceed, Executor listenerExecutor) { 1777 return new CombinedFuture<V, List<V>>( 1778 futures, allMustSucceed, listenerExecutor, 1779 new FutureCombiner<V, List<V>>() { 1780 @Override 1781 public List<V> combine(List<Optional<V>> values) { 1782 List<V> result = Lists.newArrayList(); 1783 for (Optional<V> element : values) { 1784 result.add(element != null ? element.orNull() : null); 1785 } 1786 return Collections.unmodifiableList(result); 1787 } 1788 }); 1789 } 1790 1791 /** 1792 * A checked future that uses a function to map from exceptions to the 1793 * appropriate checked type. 1794 */ 1795 private static class MappingCheckedFuture<V, X extends Exception> extends 1796 AbstractCheckedFuture<V, X> { 1797 1798 final Function<? super Exception, X> mapper; 1799 1800 MappingCheckedFuture(ListenableFuture<V> delegate, 1801 Function<? super Exception, X> mapper) { 1802 super(delegate); 1803 1804 this.mapper = checkNotNull(mapper); 1805 } 1806 1807 @Override 1808 protected X mapException(Exception e) { 1809 return mapper.apply(e); 1810 } 1811 } 1812 } 1813