• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2006 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
24 import static java.lang.Thread.currentThread;
25 import static java.util.Arrays.asList;
26 
27 import com.google.common.annotations.Beta;
28 import com.google.common.base.Function;
29 import com.google.common.base.Optional;
30 import com.google.common.base.Preconditions;
31 import com.google.common.collect.ImmutableCollection;
32 import com.google.common.collect.ImmutableList;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Ordering;
35 import com.google.common.collect.Queues;
36 import com.google.common.collect.Sets;
37 
38 import java.lang.reflect.Constructor;
39 import java.lang.reflect.InvocationTargetException;
40 import java.lang.reflect.UndeclaredThrowableException;
41 import java.util.Arrays;
42 import java.util.Collections;
43 import java.util.List;
44 import java.util.Set;
45 import java.util.concurrent.Callable;
46 import java.util.concurrent.CancellationException;
47 import java.util.concurrent.ConcurrentLinkedQueue;
48 import java.util.concurrent.ExecutionException;
49 import java.util.concurrent.Executor;
50 import java.util.concurrent.Future;
51 import java.util.concurrent.RejectedExecutionException;
52 import java.util.concurrent.TimeUnit;
53 import java.util.concurrent.TimeoutException;
54 import java.util.concurrent.atomic.AtomicBoolean;
55 import java.util.concurrent.atomic.AtomicInteger;
56 import java.util.logging.Level;
57 import java.util.logging.Logger;
58 
59 import javax.annotation.Nullable;
60 
61 /**
62  * Static utility methods pertaining to the {@link Future} interface.
63  *
64  * <p>Many of these methods use the {@link ListenableFuture} API; consult the
65  * Guava User Guide article on <a href=
66  * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
67  * {@code ListenableFuture}</a>.
68  *
69  * @author Kevin Bourrillion
70  * @author Nishant Thakkar
71  * @author Sven Mawson
72  * @since 1.0
73  */
74 @Beta
75 public final class Futures {
Futures()76   private Futures() {}
77 
78   /**
79    * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
80    * and a {@link Function} that maps from {@link Exception} instances into the
81    * appropriate checked type.
82    *
83    * <p>The given mapping function will be applied to an
84    * {@link InterruptedException}, a {@link CancellationException}, or an
85    * {@link ExecutionException}.
86    * See {@link Future#get()} for details on the exceptions thrown.
87    *
88    * @since 9.0 (source-compatible since 1.0)
89    */
makeChecked( ListenableFuture<V> future, Function<? super Exception, X> mapper)90   public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
91       ListenableFuture<V> future, Function<? super Exception, X> mapper) {
92     return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
93   }
94 
95   private abstract static class ImmediateFuture<V>
96       implements ListenableFuture<V> {
97 
98     private static final Logger log =
99         Logger.getLogger(ImmediateFuture.class.getName());
100 
101     @Override
addListener(Runnable listener, Executor executor)102     public void addListener(Runnable listener, Executor executor) {
103       checkNotNull(listener, "Runnable was null.");
104       checkNotNull(executor, "Executor was null.");
105       try {
106         executor.execute(listener);
107       } catch (RuntimeException e) {
108         // ListenableFuture's contract is that it will not throw unchecked
109         // exceptions, so log the bad runnable and/or executor and swallow it.
110         log.log(Level.SEVERE, "RuntimeException while executing runnable "
111             + listener + " with executor " + executor, e);
112       }
113     }
114 
115     @Override
cancel(boolean mayInterruptIfRunning)116     public boolean cancel(boolean mayInterruptIfRunning) {
117       return false;
118     }
119 
120     @Override
get()121     public abstract V get() throws ExecutionException;
122 
123     @Override
get(long timeout, TimeUnit unit)124     public V get(long timeout, TimeUnit unit) throws ExecutionException {
125       checkNotNull(unit);
126       return get();
127     }
128 
129     @Override
isCancelled()130     public boolean isCancelled() {
131       return false;
132     }
133 
134     @Override
isDone()135     public boolean isDone() {
136       return true;
137     }
138   }
139 
140   private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> {
141 
142     @Nullable private final V value;
143 
ImmediateSuccessfulFuture(@ullable V value)144     ImmediateSuccessfulFuture(@Nullable V value) {
145       this.value = value;
146     }
147 
148     @Override
get()149     public V get() {
150       return value;
151     }
152   }
153 
154   private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception>
155       extends ImmediateFuture<V> implements CheckedFuture<V, X> {
156 
157     @Nullable private final V value;
158 
ImmediateSuccessfulCheckedFuture(@ullable V value)159     ImmediateSuccessfulCheckedFuture(@Nullable V value) {
160       this.value = value;
161     }
162 
163     @Override
get()164     public V get() {
165       return value;
166     }
167 
168     @Override
checkedGet()169     public V checkedGet() {
170       return value;
171     }
172 
173     @Override
checkedGet(long timeout, TimeUnit unit)174     public V checkedGet(long timeout, TimeUnit unit) {
175       checkNotNull(unit);
176       return value;
177     }
178   }
179 
180   private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
181 
182     private final Throwable thrown;
183 
ImmediateFailedFuture(Throwable thrown)184     ImmediateFailedFuture(Throwable thrown) {
185       this.thrown = thrown;
186     }
187 
188     @Override
get()189     public V get() throws ExecutionException {
190       throw new ExecutionException(thrown);
191     }
192   }
193 
194   private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> {
195 
196     private final CancellationException thrown;
197 
ImmediateCancelledFuture()198     ImmediateCancelledFuture() {
199       this.thrown = new CancellationException("Immediate cancelled future.");
200     }
201 
202     @Override
isCancelled()203     public boolean isCancelled() {
204       return true;
205     }
206 
207     @Override
get()208     public V get() {
209       throw AbstractFuture.cancellationExceptionWithCause(
210           "Task was cancelled.", thrown);
211     }
212   }
213 
214   private static class ImmediateFailedCheckedFuture<V, X extends Exception>
215       extends ImmediateFuture<V> implements CheckedFuture<V, X> {
216 
217     private final X thrown;
218 
ImmediateFailedCheckedFuture(X thrown)219     ImmediateFailedCheckedFuture(X thrown) {
220       this.thrown = thrown;
221     }
222 
223     @Override
get()224     public V get() throws ExecutionException {
225       throw new ExecutionException(thrown);
226     }
227 
228     @Override
checkedGet()229     public V checkedGet() throws X {
230       throw thrown;
231     }
232 
233     @Override
checkedGet(long timeout, TimeUnit unit)234     public V checkedGet(long timeout, TimeUnit unit) throws X {
235       checkNotNull(unit);
236       throw thrown;
237     }
238   }
239 
240   /**
241    * Creates a {@code ListenableFuture} which has its value set immediately upon
242    * construction. The getters just return the value. This {@code Future} can't
243    * be canceled or timed out and its {@code isDone()} method always returns
244    * {@code true}.
245    */
immediateFuture(@ullable V value)246   public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
247     return new ImmediateSuccessfulFuture<V>(value);
248   }
249 
250   /**
251    * Returns a {@code CheckedFuture} which has its value set immediately upon
252    * construction.
253    *
254    * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
255    * method always returns {@code true}. Calling {@code get()} or {@code
256    * checkedGet()} will immediately return the provided value.
257    */
258   public static <V, X extends Exception> CheckedFuture<V, X>
immediateCheckedFuture(@ullable V value)259       immediateCheckedFuture(@Nullable V value) {
260     return new ImmediateSuccessfulCheckedFuture<V, X>(value);
261   }
262 
263   /**
264    * Returns a {@code ListenableFuture} which has an exception set immediately
265    * upon construction.
266    *
267    * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
268    * method always returns {@code true}. Calling {@code get()} will immediately
269    * throw the provided {@code Throwable} wrapped in an {@code
270    * ExecutionException}.
271    */
immediateFailedFuture( Throwable throwable)272   public static <V> ListenableFuture<V> immediateFailedFuture(
273       Throwable throwable) {
274     checkNotNull(throwable);
275     return new ImmediateFailedFuture<V>(throwable);
276   }
277 
278   /**
279    * Creates a {@code ListenableFuture} which is cancelled immediately upon
280    * construction, so that {@code isCancelled()} always returns {@code true}.
281    *
282    * @since 14.0
283    */
immediateCancelledFuture()284   public static <V> ListenableFuture<V> immediateCancelledFuture() {
285     return new ImmediateCancelledFuture<V>();
286   }
287 
288   /**
289    * Returns a {@code CheckedFuture} which has an exception set immediately upon
290    * construction.
291    *
292    * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
293    * method always returns {@code true}. Calling {@code get()} will immediately
294    * throw the provided {@code Exception} wrapped in an {@code
295    * ExecutionException}, and calling {@code checkedGet()} will throw the
296    * provided exception itself.
297    */
298   public static <V, X extends Exception> CheckedFuture<V, X>
immediateFailedCheckedFuture(X exception)299       immediateFailedCheckedFuture(X exception) {
300     checkNotNull(exception);
301     return new ImmediateFailedCheckedFuture<V, X>(exception);
302   }
303 
304   /**
305    * Returns a {@code Future} whose result is taken from the given primary
306    * {@code input} or, if the primary input fails, from the {@code Future}
307    * provided by the {@code fallback}. {@link FutureFallback#create} is not
308    * invoked until the primary input has failed, so if the primary input
309    * succeeds, it is never invoked. If, during the invocation of {@code
310    * fallback}, an exception is thrown, this exception is used as the result of
311    * the output {@code Future}.
312    *
313    * <p>Below is an example of a fallback that returns a default value if an
314    * exception occurs:
315    *
316    * <pre>   {@code
317    *   ListenableFuture<Integer> fetchCounterFuture = ...;
318    *
319    *   // Falling back to a zero counter in case an exception happens when
320    *   // processing the RPC to fetch counters.
321    *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
322    *       fetchCounterFuture, new FutureFallback<Integer>() {
323    *         public ListenableFuture<Integer> create(Throwable t) {
324    *           // Returning "0" as the default for the counter when the
325    *           // exception happens.
326    *           return immediateFuture(0);
327    *         }
328    *       });}</pre>
329    *
330    * <p>The fallback can also choose to propagate the original exception when
331    * desired:
332    *
333    * <pre>   {@code
334    *   ListenableFuture<Integer> fetchCounterFuture = ...;
335    *
336    *   // Falling back to a zero counter only in case the exception was a
337    *   // TimeoutException.
338    *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
339    *       fetchCounterFuture, new FutureFallback<Integer>() {
340    *         public ListenableFuture<Integer> create(Throwable t) {
341    *           if (t instanceof TimeoutException) {
342    *             return immediateFuture(0);
343    *           }
344    *           return immediateFailedFuture(t);
345    *         }
346    *       });}</pre>
347    *
348    * <p>Note: If the derived {@code Future} is slow or heavyweight to create
349    * (whether the {@code Future} itself is slow or heavyweight to complete is
350    * irrelevant), consider {@linkplain #withFallback(ListenableFuture,
351    * FutureFallback, Executor) supplying an executor}. If you do not supply an
352    * executor, {@code withFallback} will use a
353    * {@linkplain MoreExecutors#directExecutor direct executor}, which carries
354    * some caveats for heavier operations. For example, the call to {@code
355    * fallback.create} may run on an unpredictable or undesirable thread:
356    *
357    * <ul>
358    * <li>If the input {@code Future} is done at the time {@code withFallback}
359    * is called, {@code withFallback} will call {@code fallback.create} inline.
360    * <li>If the input {@code Future} is not yet done, {@code withFallback} will
361    * schedule {@code fallback.create} to be run by the thread that completes
362    * the input {@code Future}, which may be an internal system thread such as
363    * an RPC network thread.
364    * </ul>
365    *
366    * <p>Also note that, regardless of which thread executes the {@code
367    * fallback.create}, all other registered but unexecuted listeners are
368    * prevented from running during its execution, even if those listeners are
369    * to run in other executors.
370    *
371    * @param input the primary input {@code Future}
372    * @param fallback the {@link FutureFallback} implementation to be called if
373    *     {@code input} fails
374    * @since 14.0
375    */
withFallback( ListenableFuture<? extends V> input, FutureFallback<? extends V> fallback)376   public static <V> ListenableFuture<V> withFallback(
377       ListenableFuture<? extends V> input,
378       FutureFallback<? extends V> fallback) {
379     return withFallback(input, fallback, directExecutor());
380   }
381 
382   /**
383    * Returns a {@code Future} whose result is taken from the given primary
384    * {@code input} or, if the primary input fails, from the {@code Future}
385    * provided by the {@code fallback}. {@link FutureFallback#create} is not
386    * invoked until the primary input has failed, so if the primary input
387    * succeeds, it is never invoked. If, during the invocation of {@code
388    * fallback}, an exception is thrown, this exception is used as the result of
389    * the output {@code Future}.
390    *
391    * <p>Below is an example of a fallback that returns a default value if an
392    * exception occurs:
393    *
394    * <pre>   {@code
395    *   ListenableFuture<Integer> fetchCounterFuture = ...;
396    *
397    *   // Falling back to a zero counter in case an exception happens when
398    *   // processing the RPC to fetch counters.
399    *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
400    *       fetchCounterFuture, new FutureFallback<Integer>() {
401    *         public ListenableFuture<Integer> create(Throwable t) {
402    *           // Returning "0" as the default for the counter when the
403    *           // exception happens.
404    *           return immediateFuture(0);
405    *         }
406    *       }, directExecutor());}</pre>
407    *
408    * <p>The fallback can also choose to propagate the original exception when
409    * desired:
410    *
411    * <pre>   {@code
412    *   ListenableFuture<Integer> fetchCounterFuture = ...;
413    *
414    *   // Falling back to a zero counter only in case the exception was a
415    *   // TimeoutException.
416    *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
417    *       fetchCounterFuture, new FutureFallback<Integer>() {
418    *         public ListenableFuture<Integer> create(Throwable t) {
419    *           if (t instanceof TimeoutException) {
420    *             return immediateFuture(0);
421    *           }
422    *           return immediateFailedFuture(t);
423    *         }
424    *       }, directExecutor());}</pre>
425    *
426    * <p>When the execution of {@code fallback.create} is fast and lightweight
427    * (though the {@code Future} it returns need not meet these criteria),
428    * consider {@linkplain #withFallback(ListenableFuture, FutureFallback)
429    * omitting the executor} or explicitly specifying {@code
430    * directExecutor}. However, be aware of the caveats documented in the
431    * link above.
432    *
433    * @param input the primary input {@code Future}
434    * @param fallback the {@link FutureFallback} implementation to be called if
435    *     {@code input} fails
436    * @param executor the executor that runs {@code fallback} if {@code input}
437    *     fails
438    * @since 14.0
439    */
withFallback( ListenableFuture<? extends V> input, FutureFallback<? extends V> fallback, Executor executor)440   public static <V> ListenableFuture<V> withFallback(
441       ListenableFuture<? extends V> input,
442       FutureFallback<? extends V> fallback, Executor executor) {
443     checkNotNull(fallback);
444     return new FallbackFuture<V>(input, fallback, executor);
445   }
446 
447   /**
448    * A future that falls back on a second, generated future, in case its
449    * original future fails.
450    */
451   private static class FallbackFuture<V> extends AbstractFuture<V> {
452 
453     private volatile ListenableFuture<? extends V> running;
454 
FallbackFuture(ListenableFuture<? extends V> input, final FutureFallback<? extends V> fallback, final Executor executor)455     FallbackFuture(ListenableFuture<? extends V> input,
456         final FutureFallback<? extends V> fallback,
457         final Executor executor) {
458       running = input;
459       addCallback(running, new FutureCallback<V>() {
460         @Override
461         public void onSuccess(V value) {
462           set(value);
463         }
464 
465         @Override
466         public void onFailure(Throwable t) {
467           if (isCancelled()) {
468             return;
469           }
470           try {
471             running = fallback.create(t);
472             if (isCancelled()) { // in case cancel called in the meantime
473               running.cancel(wasInterrupted());
474               return;
475             }
476             addCallback(running, new FutureCallback<V>() {
477               @Override
478               public void onSuccess(V value) {
479                 set(value);
480               }
481 
482               @Override
483               public void onFailure(Throwable t) {
484                 if (running.isCancelled()) {
485                   cancel(false);
486                 } else {
487                   setException(t);
488                 }
489               }
490             }, directExecutor());
491           } catch (Throwable e) {
492             setException(e);
493           }
494         }
495       }, executor);
496     }
497 
498     @Override
cancel(boolean mayInterruptIfRunning)499     public boolean cancel(boolean mayInterruptIfRunning) {
500       if (super.cancel(mayInterruptIfRunning)) {
501         running.cancel(mayInterruptIfRunning);
502         return true;
503       }
504       return false;
505     }
506   }
507 
508   /**
509    * Returns a new {@code ListenableFuture} whose result is asynchronously
510    * derived from the result of the given {@code Future}. More precisely, the
511    * returned {@code Future} takes its result from a {@code Future} produced by
512    * applying the given {@code AsyncFunction} to the result of the original
513    * {@code Future}. Example:
514    *
515    * <pre>   {@code
516    *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
517    *   AsyncFunction<RowKey, QueryResult> queryFunction =
518    *       new AsyncFunction<RowKey, QueryResult>() {
519    *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
520    *           return dataService.read(rowKey);
521    *         }
522    *       };
523    *   ListenableFuture<QueryResult> queryFuture =
524    *       transform(rowKeyFuture, queryFunction);}</pre>
525    *
526    * <p>Note: If the derived {@code Future} is slow or heavyweight to create
527    * (whether the {@code Future} itself is slow or heavyweight to complete is
528    * irrelevant), consider {@linkplain #transform(ListenableFuture,
529    * AsyncFunction, Executor) supplying an executor}. If you do not supply an
530    * executor, {@code transform} will use a
531    * {@linkplain MoreExecutors#directExecutor direct executor}, which carries
532    * some caveats for heavier operations. For example, the call to {@code
533    * function.apply} may run on an unpredictable or undesirable thread:
534    *
535    * <ul>
536    * <li>If the input {@code Future} is done at the time {@code transform} is
537    * called, {@code transform} will call {@code function.apply} inline.
538    * <li>If the input {@code Future} is not yet done, {@code transform} will
539    * schedule {@code function.apply} to be run by the thread that completes the
540    * input {@code Future}, which may be an internal system thread such as an
541    * RPC network thread.
542    * </ul>
543    *
544    * <p>Also note that, regardless of which thread executes the {@code
545    * function.apply}, all other registered but unexecuted listeners are
546    * prevented from running during its execution, even if those listeners are
547    * to run in other executors.
548    *
549    * <p>The returned {@code Future} attempts to keep its cancellation state in
550    * sync with that of the input future and that of the future returned by the
551    * function. That is, if the returned {@code Future} is cancelled, it will
552    * attempt to cancel the other two, and if either of the other two is
553    * cancelled, the returned {@code Future} will receive a callback in which it
554    * will attempt to cancel itself.
555    *
556    * @param input The future to transform
557    * @param function A function to transform the result of the input future
558    *     to the result of the output future
559    * @return A future that holds result of the function (if the input succeeded)
560    *     or the original input's failure (if not)
561    * @since 11.0
562    */
transform(ListenableFuture<I> input, AsyncFunction<? super I, ? extends O> function)563   public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
564       AsyncFunction<? super I, ? extends O> function) {
565     ChainingListenableFuture<I, O> output =
566         new ChainingListenableFuture<I, O>(function, input);
567     input.addListener(output, directExecutor());
568     return output;
569   }
570 
571   /**
572    * Returns a new {@code ListenableFuture} whose result is asynchronously
573    * derived from the result of the given {@code Future}. More precisely, the
574    * returned {@code Future} takes its result from a {@code Future} produced by
575    * applying the given {@code AsyncFunction} to the result of the original
576    * {@code Future}. Example:
577    *
578    * <pre>   {@code
579    *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
580    *   AsyncFunction<RowKey, QueryResult> queryFunction =
581    *       new AsyncFunction<RowKey, QueryResult>() {
582    *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
583    *           return dataService.read(rowKey);
584    *         }
585    *       };
586    *   ListenableFuture<QueryResult> queryFuture =
587    *       transform(rowKeyFuture, queryFunction, executor);}</pre>
588    *
589    * <p>The returned {@code Future} attempts to keep its cancellation state in
590    * sync with that of the input future and that of the future returned by the
591    * chain function. That is, if the returned {@code Future} is cancelled, it
592    * will attempt to cancel the other two, and if either of the other two is
593    * cancelled, the returned {@code Future} will receive a callback in which it
594    * will attempt to cancel itself.
595    *
596    * <p>When the execution of {@code function.apply} is fast and lightweight
597    * (though the {@code Future} it returns need not meet these criteria),
598    * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting
599    * the executor} or explicitly specifying {@code directExecutor}.
600    * However, be aware of the caveats documented in the link above.
601    *
602    * @param input The future to transform
603    * @param function A function to transform the result of the input future
604    *     to the result of the output future
605    * @param executor Executor to run the function in.
606    * @return A future that holds result of the function (if the input succeeded)
607    *     or the original input's failure (if not)
608    * @since 11.0
609    */
transform(ListenableFuture<I> input, AsyncFunction<? super I, ? extends O> function, Executor executor)610   public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
611       AsyncFunction<? super I, ? extends O> function,
612       Executor executor) {
613     checkNotNull(executor);
614     ChainingListenableFuture<I, O> output =
615         new ChainingListenableFuture<I, O>(function, input);
616     input.addListener(rejectionPropagatingRunnable(output, output, executor), directExecutor());
617     return output;
618   }
619 
620   /**
621    * Returns a Runnable that will invoke the delegate Runnable on the delegate executor, but if the
622    * task is rejected, it will propagate that rejection to the output future.
623    */
rejectionPropagatingRunnable( final AbstractFuture<?> outputFuture, final Runnable delegateTask, final Executor delegateExecutor)624   private static Runnable rejectionPropagatingRunnable(
625       final AbstractFuture<?> outputFuture,
626       final Runnable delegateTask,
627       final Executor delegateExecutor) {
628     return new Runnable() {
629       @Override public void run() {
630         final AtomicBoolean thrownFromDelegate = new AtomicBoolean(true);
631         try {
632           delegateExecutor.execute(new Runnable() {
633             @Override public void run() {
634               thrownFromDelegate.set(false);
635               delegateTask.run();
636             }
637           });
638         } catch (RejectedExecutionException e) {
639           if (thrownFromDelegate.get()) {
640             // wrap exception?
641             outputFuture.setException(e);
642           }
643           // otherwise it must have been thrown from a transitive call and the delegate runnable
644           // should have handled it.
645         }
646       }
647     };
648   }
649 
650   /**
651    * Returns a new {@code ListenableFuture} whose result is the product of
652    * applying the given {@code Function} to the result of the given {@code
653    * Future}. Example:
654    *
655    * <pre>   {@code
656    *   ListenableFuture<QueryResult> queryFuture = ...;
657    *   Function<QueryResult, List<Row>> rowsFunction =
658    *       new Function<QueryResult, List<Row>>() {
659    *         public List<Row> apply(QueryResult queryResult) {
660    *           return queryResult.getRows();
661    *         }
662    *       };
663    *   ListenableFuture<List<Row>> rowsFuture =
664    *       transform(queryFuture, rowsFunction);}</pre>
665    *
666    * <p>Note: If the transformation is slow or heavyweight, consider {@linkplain
667    * #transform(ListenableFuture, Function, Executor) supplying an executor}.
668    * If you do not supply an executor, {@code transform} will use an inline
669    * executor, which carries some caveats for heavier operations.  For example,
670    * the call to {@code function.apply} may run on an unpredictable or
671    * undesirable thread:
672    *
673    * <ul>
674    * <li>If the input {@code Future} is done at the time {@code transform} is
675    * called, {@code transform} will call {@code function.apply} inline.
676    * <li>If the input {@code Future} is not yet done, {@code transform} will
677    * schedule {@code function.apply} to be run by the thread that completes the
678    * input {@code Future}, which may be an internal system thread such as an
679    * RPC network thread.
680    * </ul>
681    *
682    * <p>Also note that, regardless of which thread executes the {@code
683    * function.apply}, all other registered but unexecuted listeners are
684    * prevented from running during its execution, even if those listeners are
685    * to run in other executors.
686    *
687    * <p>The returned {@code Future} attempts to keep its cancellation state in
688    * sync with that of the input future. That is, if the returned {@code Future}
689    * is cancelled, it will attempt to cancel the input, and if the input is
690    * cancelled, the returned {@code Future} will receive a callback in which it
691    * will attempt to cancel itself.
692    *
693    * <p>An example use of this method is to convert a serializable object
694    * returned from an RPC into a POJO.
695    *
696    * @param input The future to transform
697    * @param function A Function to transform the results of the provided future
698    *     to the results of the returned future.  This will be run in the thread
699    *     that notifies input it is complete.
700    * @return A future that holds result of the transformation.
701    * @since 9.0 (in 1.0 as {@code compose})
702    */
703   public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
704       final Function<? super I, ? extends O> function) {
705     checkNotNull(function);
706     ChainingListenableFuture<I, O> output =
707         new ChainingListenableFuture<I, O>(asAsyncFunction(function), input);
708     input.addListener(output, directExecutor());
709     return output;
710   }
711 
712   /**
713    * Returns a new {@code ListenableFuture} whose result is the product of
714    * applying the given {@code Function} to the result of the given {@code
715    * Future}. Example:
716    *
717    * <pre>   {@code
718    *   ListenableFuture<QueryResult> queryFuture = ...;
719    *   Function<QueryResult, List<Row>> rowsFunction =
720    *       new Function<QueryResult, List<Row>>() {
721    *         public List<Row> apply(QueryResult queryResult) {
722    *           return queryResult.getRows();
723    *         }
724    *       };
725    *   ListenableFuture<List<Row>> rowsFuture =
726    *       transform(queryFuture, rowsFunction, executor);}</pre>
727    *
728    * <p>The returned {@code Future} attempts to keep its cancellation state in
729    * sync with that of the input future. That is, if the returned {@code Future}
730    * is cancelled, it will attempt to cancel the input, and if the input is
731    * cancelled, the returned {@code Future} will receive a callback in which it
732    * will attempt to cancel itself.
733    *
734    * <p>An example use of this method is to convert a serializable object
735    * returned from an RPC into a POJO.
736    *
737    * <p>When the transformation is fast and lightweight, consider {@linkplain
738    * #transform(ListenableFuture, Function) omitting the executor} or
739    * explicitly specifying {@code directExecutor}. However, be aware of the
740    * caveats documented in the link above.
741    *
742    * @param input The future to transform
743    * @param function A Function to transform the results of the provided future
744    *     to the results of the returned future.
745    * @param executor Executor to run the function in.
746    * @return A future that holds result of the transformation.
747    * @since 9.0 (in 2.0 as {@code compose})
748    */
749   public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
750       final Function<? super I, ? extends O> function, Executor executor) {
751     checkNotNull(function);
752     return transform(input, asAsyncFunction(function), executor);
753   }
754 
755   /** Wraps the given function as an AsyncFunction. */
756   private static <I, O> AsyncFunction<I, O> asAsyncFunction(
757       final Function<? super I, ? extends O> function) {
758     return new AsyncFunction<I, O>() {
759       @Override public ListenableFuture<O> apply(I input) {
760         O output = function.apply(input);
761         return immediateFuture(output);
762       }
763     };
764   }
765 
766   /**
767    * Like {@link #transform(ListenableFuture, Function)} except that the
768    * transformation {@code function} is invoked on each call to
769    * {@link Future#get() get()} on the returned future.
770    *
771    * <p>The returned {@code Future} reflects the input's cancellation
772    * state directly, and any attempt to cancel the returned Future is likewise
773    * passed through to the input Future.
774    *
775    * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
776    * only apply the timeout to the execution of the underlying {@code Future},
777    * <em>not</em> to the execution of the transformation function.
778    *
779    * <p>The primary audience of this method is callers of {@code transform}
780    * who don't have a {@code ListenableFuture} available and
781    * do not mind repeated, lazy function evaluation.
782    *
783    * @param input The future to transform
784    * @param function A Function to transform the results of the provided future
785    *     to the results of the returned future.
786    * @return A future that returns the result of the transformation.
787    * @since 10.0
788    */
789   public static <I, O> Future<O> lazyTransform(final Future<I> input,
790       final Function<? super I, ? extends O> function) {
791     checkNotNull(input);
792     checkNotNull(function);
793     return new Future<O>() {
794 
795       @Override
796       public boolean cancel(boolean mayInterruptIfRunning) {
797         return input.cancel(mayInterruptIfRunning);
798       }
799 
800       @Override
801       public boolean isCancelled() {
802         return input.isCancelled();
803       }
804 
805       @Override
806       public boolean isDone() {
807         return input.isDone();
808       }
809 
810       @Override
811       public O get() throws InterruptedException, ExecutionException {
812         return applyTransformation(input.get());
813       }
814 
815       @Override
816       public O get(long timeout, TimeUnit unit)
817           throws InterruptedException, ExecutionException, TimeoutException {
818         return applyTransformation(input.get(timeout, unit));
819       }
820 
821       private O applyTransformation(I input) throws ExecutionException {
822         try {
823           return function.apply(input);
824         } catch (Throwable t) {
825           throw new ExecutionException(t);
826         }
827       }
828     };
829   }
830 
831   /**
832    * An implementation of {@code ListenableFuture} that also implements
833    * {@code Runnable} so that it can be used to nest ListenableFutures.
834    * Once the passed-in {@code ListenableFuture} is complete, it calls the
835    * passed-in {@code Function} to generate the result.
836    *
837    * <p>For historical reasons, this class has a special case in its exception
838    * handling: If the given {@code AsyncFunction} throws an {@code
839    * UndeclaredThrowableException}, {@code ChainingListenableFuture} unwraps it
840    * and uses its <i>cause</i> as the output future's exception, rather than
841    * using the {@code UndeclaredThrowableException} itself as it would for other
842    * exception types. The reason for this is that {@code Futures.transform} used
843    * to require a {@code Function}, whose {@code apply} method is not allowed to
844    * throw checked exceptions. Nowadays, {@code Futures.transform} has an
845    * overload that accepts an {@code AsyncFunction}, whose {@code apply} method
846    * <i>is</i> allowed to throw checked exception. Users who wish to throw
847    * checked exceptions should use that overload instead, and <a
848    * href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we
849    * should remove the {@code UndeclaredThrowableException} special case</a>.
850    */
851   private static class ChainingListenableFuture<I, O>
852       extends AbstractFuture<O> implements Runnable {
853 
854     private AsyncFunction<? super I, ? extends O> function;
855     private ListenableFuture<? extends I> inputFuture;
856     private volatile ListenableFuture<? extends O> outputFuture;
857 
858     private ChainingListenableFuture(
859         AsyncFunction<? super I, ? extends O> function,
860         ListenableFuture<? extends I> inputFuture) {
861       this.function = checkNotNull(function);
862       this.inputFuture = checkNotNull(inputFuture);
863     }
864 
865     @Override
866     public boolean cancel(boolean mayInterruptIfRunning) {
867       /*
868        * Our additional cancellation work needs to occur even if
869        * !mayInterruptIfRunning, so we can't move it into interruptTask().
870        */
871       if (super.cancel(mayInterruptIfRunning)) {
872         // This should never block since only one thread is allowed to cancel
873         // this Future.
874         cancel(inputFuture, mayInterruptIfRunning);
875         cancel(outputFuture, mayInterruptIfRunning);
876         return true;
877       }
878       return false;
879     }
880 
881     private void cancel(@Nullable Future<?> future,
882         boolean mayInterruptIfRunning) {
883       if (future != null) {
884         future.cancel(mayInterruptIfRunning);
885       }
886     }
887 
888     @Override
889     public void run() {
890       try {
891         I sourceResult;
892         try {
893           sourceResult = getUninterruptibly(inputFuture);
894         } catch (CancellationException e) {
895           // Cancel this future and return.
896           // At this point, inputFuture is cancelled and outputFuture doesn't
897           // exist, so the value of mayInterruptIfRunning is irrelevant.
898           cancel(false);
899           return;
900         } catch (ExecutionException e) {
901           // Set the cause of the exception as this future's exception
902           setException(e.getCause());
903           return;
904         }
905 
906         final ListenableFuture<? extends O> outputFuture = this.outputFuture =
907             Preconditions.checkNotNull(function.apply(sourceResult),
908                 "AsyncFunction may not return null.");
909         if (isCancelled()) {
910           outputFuture.cancel(wasInterrupted());
911           this.outputFuture = null;
912           return;
913         }
914         outputFuture.addListener(new Runnable() {
915             @Override
916             public void run() {
917               try {
918                 set(getUninterruptibly(outputFuture));
919               } catch (CancellationException e) {
920                 // Cancel this future and return.
921                 // At this point, inputFuture and outputFuture are done, so the
922                 // value of mayInterruptIfRunning is irrelevant.
923                 cancel(false);
924                 return;
925               } catch (ExecutionException e) {
926                 // Set the cause of the exception as this future's exception
927                 setException(e.getCause());
928               } finally {
929                 // Don't pin inputs beyond completion
930                 ChainingListenableFuture.this.outputFuture = null;
931               }
932             }
933           }, directExecutor());
934       } catch (UndeclaredThrowableException e) {
935         // Set the cause of the exception as this future's exception
936         setException(e.getCause());
937       } catch (Throwable t) {
938         // This exception is irrelevant in this thread, but useful for the
939         // client
940         setException(t);
941       } finally {
942         // Don't pin inputs beyond completion
943         function = null;
944         inputFuture = null;
945       }
946     }
947   }
948 
949   /**
950    * Returns a new {@code ListenableFuture} whose result is the product of
951    * calling {@code get()} on the {@code Future} nested within the given {@code
952    * Future}, effectively chaining the futures one after the other.  Example:
953    *
954    * <pre>   {@code
955    *   SettableFuture<ListenableFuture<String>> nested = SettableFuture.create();
956    *   ListenableFuture<String> dereferenced = dereference(nested);}</pre>
957    *
958    * <p>This call has the same cancellation and execution semantics as {@link
959    * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code
960    * Future} attempts to keep its cancellation state in sync with both the
961    * input {@code Future} and the nested {@code Future}.  The transformation
962    * is very lightweight and therefore takes place in the same thread (either
963    * the thread that called {@code dereference}, or the thread in which the
964    * dereferenced future completes).
965    *
966    * @param nested The nested future to transform.
967    * @return A future that holds result of the inner future.
968    * @since 13.0
969    */
970   @SuppressWarnings({"rawtypes", "unchecked"})
971   public static <V> ListenableFuture<V> dereference(
972       ListenableFuture<? extends ListenableFuture<? extends V>> nested) {
973     return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER);
974   }
975 
976   /**
977    * Helper {@code Function} for {@link #dereference}.
978    */
979   private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER =
980       new AsyncFunction<ListenableFuture<Object>, Object>() {
981         @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) {
982           return input;
983         }
984       };
985 
986   /**
987    * Creates a new {@code ListenableFuture} whose value is a list containing the
988    * values of all its input futures, if all succeed. If any input fails, the
989    * returned future fails immediately.
990    *
991    * <p>The list of results is in the same order as the input list.
992    *
993    * <p>Canceling this future will attempt to cancel all the component futures,
994    * and if any of the provided futures fails or is canceled, this one is,
995    * too.
996    *
997    * @param futures futures to combine
998    * @return a future that provides a list of the results of the component
999    *         futures
1000    * @since 10.0
1001    */
1002   @Beta
1003   public static <V> ListenableFuture<List<V>> allAsList(
1004       ListenableFuture<? extends V>... futures) {
1005     return listFuture(ImmutableList.copyOf(futures), true, directExecutor());
1006   }
1007 
1008   /**
1009    * Creates a new {@code ListenableFuture} whose value is a list containing the
1010    * values of all its input futures, if all succeed. If any input fails, the
1011    * returned future fails immediately.
1012    *
1013    * <p>The list of results is in the same order as the input list.
1014    *
1015    * <p>Canceling this future will attempt to cancel all the component futures,
1016    * and if any of the provided futures fails or is canceled, this one is,
1017    * too.
1018    *
1019    * @param futures futures to combine
1020    * @return a future that provides a list of the results of the component
1021    *         futures
1022    * @since 10.0
1023    */
1024   @Beta
1025   public static <V> ListenableFuture<List<V>> allAsList(
1026       Iterable<? extends ListenableFuture<? extends V>> futures) {
1027     return listFuture(ImmutableList.copyOf(futures), true, directExecutor());
1028   }
1029 
1030   private static final class WrappedCombiner<T> implements Callable<T> {
1031     final Callable<T> delegate;
1032     CombinerFuture<T> outputFuture;
1033 
1034     WrappedCombiner(Callable<T> delegate) {
1035       this.delegate = checkNotNull(delegate);
1036     }
1037 
1038     @Override public T call() throws Exception {
1039       try {
1040         return delegate.call();
1041       } catch (ExecutionException e) {
1042         outputFuture.setException(e.getCause());
1043       } catch (CancellationException e) {
1044         outputFuture.cancel(false);
1045       }
1046       // at this point the return value doesn't matter since we already called setException or
1047       // cancel so the future is done.
1048       return null;
1049     }
1050   }
1051 
1052   private static final class CombinerFuture<V> extends ListenableFutureTask<V> {
1053     ImmutableList<ListenableFuture<?>> futures;
1054 
1055     CombinerFuture(Callable<V> callable, ImmutableList<ListenableFuture<?>> futures) {
1056       super(callable);
1057       this.futures = futures;
1058     }
1059 
1060     @Override public boolean cancel(boolean mayInterruptIfRunning) {
1061       ImmutableList<ListenableFuture<?>> futures = this.futures;
1062       if (super.cancel(mayInterruptIfRunning)) {
1063         for (ListenableFuture<?> future : futures) {
1064           future.cancel(mayInterruptIfRunning);
1065         }
1066         return true;
1067       }
1068       return false;
1069     }
1070 
1071     @Override protected void done() {
1072       super.done();
1073       futures = null;
1074     }
1075 
1076     @Override protected void setException(Throwable t) {
1077       super.setException(t);
1078     }
1079   }
1080 
1081   /**
1082    * Creates a new {@code ListenableFuture} whose result is set from the
1083    * supplied future when it completes.  Cancelling the supplied future
1084    * will also cancel the returned future, but cancelling the returned
1085    * future will have no effect on the supplied future.
1086    *
1087    * @since 15.0
1088    */
1089   public static <V> ListenableFuture<V> nonCancellationPropagating(
1090       ListenableFuture<V> future) {
1091     return new NonCancellationPropagatingFuture<V>(future);
1092   }
1093 
1094   /**
1095    * A wrapped future that does not propagate cancellation to its delegate.
1096    */
1097   private static class NonCancellationPropagatingFuture<V>
1098       extends AbstractFuture<V> {
1099     NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) {
1100       checkNotNull(delegate);
1101       addCallback(delegate, new FutureCallback<V>() {
1102         @Override
1103         public void onSuccess(V result) {
1104           set(result);
1105         }
1106 
1107         @Override
1108         public void onFailure(Throwable t) {
1109           if (delegate.isCancelled()) {
1110             cancel(false);
1111           } else {
1112             setException(t);
1113           }
1114         }
1115       }, directExecutor());
1116     }
1117   }
1118 
1119   /**
1120    * Creates a new {@code ListenableFuture} whose value is a list containing the
1121    * values of all its successful input futures. The list of results is in the
1122    * same order as the input list, and if any of the provided futures fails or
1123    * is canceled, its corresponding position will contain {@code null} (which is
1124    * indistinguishable from the future having a successful value of
1125    * {@code null}).
1126    *
1127    * <p>Canceling this future will attempt to cancel all the component futures.
1128    *
1129    * @param futures futures to combine
1130    * @return a future that provides a list of the results of the component
1131    *         futures
1132    * @since 10.0
1133    */
1134   @Beta
1135   public static <V> ListenableFuture<List<V>> successfulAsList(
1136       ListenableFuture<? extends V>... futures) {
1137     return listFuture(ImmutableList.copyOf(futures), false, directExecutor());
1138   }
1139 
1140   /**
1141    * Creates a new {@code ListenableFuture} whose value is a list containing the
1142    * values of all its successful input futures. The list of results is in the
1143    * same order as the input list, and if any of the provided futures fails or
1144    * is canceled, its corresponding position will contain {@code null} (which is
1145    * indistinguishable from the future having a successful value of
1146    * {@code null}).
1147    *
1148    * <p>Canceling this future will attempt to cancel all the component futures.
1149    *
1150    * @param futures futures to combine
1151    * @return a future that provides a list of the results of the component
1152    *         futures
1153    * @since 10.0
1154    */
1155   @Beta
1156   public static <V> ListenableFuture<List<V>> successfulAsList(
1157       Iterable<? extends ListenableFuture<? extends V>> futures) {
1158     return listFuture(ImmutableList.copyOf(futures), false, directExecutor());
1159   }
1160 
1161   /**
1162    * Returns a list of delegate futures that correspond to the futures received in the order
1163    * that they complete. Delegate futures return the same value or throw the same exception
1164    * as the corresponding input future returns/throws.
1165    *
1166    * <p>Cancelling a delegate future has no effect on any input future, since the delegate future
1167    * does not correspond to a specific input future until the appropriate number of input
1168    * futures have completed. At that point, it is too late to cancel the input future.
1169    * The input future's result, which cannot be stored into the cancelled delegate future,
1170    * is ignored.
1171    *
1172    * @since 17.0
1173    */
1174   @Beta
1175   public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(
1176       Iterable<? extends ListenableFuture<? extends T>> futures) {
1177     // A CLQ may be overkill here.  We could save some pointers/memory by synchronizing on an
1178     // ArrayDeque
1179     final ConcurrentLinkedQueue<AsyncSettableFuture<T>> delegates =
1180         Queues.newConcurrentLinkedQueue();
1181     ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
1182     // Using SerializingExecutor here will ensure that each CompletionOrderListener executes
1183     // atomically and therefore that each returned future is guaranteed to be in completion order.
1184     // N.B. there are some cases where the use of this executor could have possibly surprising
1185     // effects when input futures finish at approximately the same time _and_ the output futures
1186     // have directExecutor listeners. In this situation, the listeners may end up running on a
1187     // different thread than if they were attached to the corresponding input future.  We believe
1188     // this to be a negligible cost since:
1189     // 1. Using the directExecutor implies that your callback is safe to run on any thread.
1190     // 2. This would likely only be noticeable if you were doing something expensive or blocking on
1191     //    a directExecutor listener on one of the output futures which is an antipattern anyway.
1192     SerializingExecutor executor = new SerializingExecutor(directExecutor());
1193     for (final ListenableFuture<? extends T> future : futures) {
1194       AsyncSettableFuture<T> delegate = AsyncSettableFuture.create();
1195       // Must make sure to add the delegate to the queue first in case the future is already done
1196       delegates.add(delegate);
1197       future.addListener(new Runnable() {
1198         @Override public void run() {
1199           delegates.remove().setFuture(future);
1200         }
1201       }, executor);
1202       listBuilder.add(delegate);
1203     }
1204     return listBuilder.build();
1205   }
1206 
1207   /**
1208    * Registers separate success and failure callbacks to be run when the {@code
1209    * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1210    * complete} or, if the computation is already complete, immediately.
1211    *
1212    * <p>There is no guaranteed ordering of execution of callbacks, but any
1213    * callback added through this method is guaranteed to be called once the
1214    * computation is complete.
1215    *
1216    * Example: <pre> {@code
1217    * ListenableFuture<QueryResult> future = ...;
1218    * addCallback(future,
1219    *     new FutureCallback<QueryResult> {
1220    *       public void onSuccess(QueryResult result) {
1221    *         storeInCache(result);
1222    *       }
1223    *       public void onFailure(Throwable t) {
1224    *         reportError(t);
1225    *       }
1226    *     });}</pre>
1227    *
1228    * <p>Note: If the callback is slow or heavyweight, consider {@linkplain
1229    * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an
1230    * executor}. If you do not supply an executor, {@code addCallback} will use
1231    * a {@linkplain MoreExecutors#directExecutor direct executor}, which carries
1232    * some caveats for heavier operations. For example, the callback may run on
1233    * an unpredictable or undesirable thread:
1234    *
1235    * <ul>
1236    * <li>If the input {@code Future} is done at the time {@code addCallback} is
1237    * called, {@code addCallback} will execute the callback inline.
1238    * <li>If the input {@code Future} is not yet done, {@code addCallback} will
1239    * schedule the callback to be run by the thread that completes the input
1240    * {@code Future}, which may be an internal system thread such as an RPC
1241    * network thread.
1242    * </ul>
1243    *
1244    * <p>Also note that, regardless of which thread executes the callback, all
1245    * other registered but unexecuted listeners are prevented from running
1246    * during its execution, even if those listeners are to run in other
1247    * executors.
1248    *
1249    * <p>For a more general interface to attach a completion listener to a
1250    * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1251    *
1252    * @param future The future attach the callback to.
1253    * @param callback The callback to invoke when {@code future} is completed.
1254    * @since 10.0
1255    */
1256   public static <V> void addCallback(ListenableFuture<V> future,
1257       FutureCallback<? super V> callback) {
1258     addCallback(future, callback, directExecutor());
1259   }
1260 
1261   /**
1262    * Registers separate success and failure callbacks to be run when the {@code
1263    * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1264    * complete} or, if the computation is already complete, immediately.
1265    *
1266    * <p>The callback is run in {@code executor}.
1267    * There is no guaranteed ordering of execution of callbacks, but any
1268    * callback added through this method is guaranteed to be called once the
1269    * computation is complete.
1270    *
1271    * Example: <pre> {@code
1272    * ListenableFuture<QueryResult> future = ...;
1273    * Executor e = ...
1274    * addCallback(future,
1275    *     new FutureCallback<QueryResult> {
1276    *       public void onSuccess(QueryResult result) {
1277    *         storeInCache(result);
1278    *       }
1279    *       public void onFailure(Throwable t) {
1280    *         reportError(t);
1281    *       }
1282    *     }, e);}</pre>
1283    *
1284    * <p>When the callback is fast and lightweight, consider {@linkplain
1285    * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or
1286    * explicitly specifying {@code directExecutor}. However, be aware of the
1287    * caveats documented in the link above.
1288    *
1289    * <p>For a more general interface to attach a completion listener to a
1290    * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1291    *
1292    * @param future The future attach the callback to.
1293    * @param callback The callback to invoke when {@code future} is completed.
1294    * @param executor The executor to run {@code callback} when the future
1295    *    completes.
1296    * @since 10.0
1297    */
1298   public static <V> void addCallback(final ListenableFuture<V> future,
1299       final FutureCallback<? super V> callback, Executor executor) {
1300     Preconditions.checkNotNull(callback);
1301     Runnable callbackListener = new Runnable() {
1302       @Override
1303       public void run() {
1304         final V value;
1305         try {
1306           // TODO(user): (Before Guava release), validate that this
1307           // is the thing for IE.
1308           value = getUninterruptibly(future);
1309         } catch (ExecutionException e) {
1310           callback.onFailure(e.getCause());
1311           return;
1312         } catch (RuntimeException e) {
1313           callback.onFailure(e);
1314           return;
1315         } catch (Error e) {
1316           callback.onFailure(e);
1317           return;
1318         }
1319         callback.onSuccess(value);
1320       }
1321     };
1322     future.addListener(callbackListener, executor);
1323   }
1324 
1325   /**
1326    * Returns the result of {@link Future#get()}, converting most exceptions to a
1327    * new instance of the given checked exception type. This reduces boilerplate
1328    * for a common use of {@code Future} in which it is unnecessary to
1329    * programmatically distinguish between exception types or to extract other
1330    * information from the exception instance.
1331    *
1332    * <p>Exceptions from {@code Future.get} are treated as follows:
1333    * <ul>
1334    * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1335    *     {@code X} if the cause is a checked exception, an {@link
1336    *     UncheckedExecutionException} if the cause is a {@code
1337    *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1338    *     {@code Error}.
1339    * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1340    *     restoring the interrupt).
1341    * <li>Any {@link CancellationException} is propagated untouched, as is any
1342    *     other {@link RuntimeException} (though {@code get} implementations are
1343    *     discouraged from throwing such exceptions).
1344    * </ul>
1345    *
1346    * <p>The overall principle is to continue to treat every checked exception as a
1347    * checked exception, every unchecked exception as an unchecked exception, and
1348    * every error as an error. In addition, the cause of any {@code
1349    * ExecutionException} is wrapped in order to ensure that the new stack trace
1350    * matches that of the current thread.
1351    *
1352    * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1353    * public constructor that accepts zero or more arguments, all of type {@code
1354    * String} or {@code Throwable} (preferring constructors with at least one
1355    * {@code String}) and calling the constructor via reflection. If the
1356    * exception did not already have a cause, one is set by calling {@link
1357    * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1358    * {@code IllegalArgumentException} is thrown.
1359    *
1360    * @throws X if {@code get} throws any checked exception except for an {@code
1361    *         ExecutionException} whose cause is not itself a checked exception
1362    * @throws UncheckedExecutionException if {@code get} throws an {@code
1363    *         ExecutionException} with a {@code RuntimeException} as its cause
1364    * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1365    *         with an {@code Error} as its cause
1366    * @throws CancellationException if {@code get} throws a {@code
1367    *         CancellationException}
1368    * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1369    *         RuntimeException} or does not have a suitable constructor
1370    * @since 10.0
1371    */
1372   public static <V, X extends Exception> V get(
1373       Future<V> future, Class<X> exceptionClass) throws X {
1374     checkNotNull(future);
1375     checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1376         "Futures.get exception type (%s) must not be a RuntimeException",
1377         exceptionClass);
1378     try {
1379       return future.get();
1380     } catch (InterruptedException e) {
1381       currentThread().interrupt();
1382       throw newWithCause(exceptionClass, e);
1383     } catch (ExecutionException e) {
1384       wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1385       throw new AssertionError();
1386     }
1387   }
1388 
1389   /**
1390    * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1391    * exceptions to a new instance of the given checked exception type. This
1392    * reduces boilerplate for a common use of {@code Future} in which it is
1393    * unnecessary to programmatically distinguish between exception types or to
1394    * extract other information from the exception instance.
1395    *
1396    * <p>Exceptions from {@code Future.get} are treated as follows:
1397    * <ul>
1398    * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1399    *     {@code X} if the cause is a checked exception, an {@link
1400    *     UncheckedExecutionException} if the cause is a {@code
1401    *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1402    *     {@code Error}.
1403    * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1404    *     restoring the interrupt).
1405    * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1406    * <li>Any {@link CancellationException} is propagated untouched, as is any
1407    *     other {@link RuntimeException} (though {@code get} implementations are
1408    *     discouraged from throwing such exceptions).
1409    * </ul>
1410    *
1411    * <p>The overall principle is to continue to treat every checked exception as a
1412    * checked exception, every unchecked exception as an unchecked exception, and
1413    * every error as an error. In addition, the cause of any {@code
1414    * ExecutionException} is wrapped in order to ensure that the new stack trace
1415    * matches that of the current thread.
1416    *
1417    * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1418    * public constructor that accepts zero or more arguments, all of type {@code
1419    * String} or {@code Throwable} (preferring constructors with at least one
1420    * {@code String}) and calling the constructor via reflection. If the
1421    * exception did not already have a cause, one is set by calling {@link
1422    * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1423    * {@code IllegalArgumentException} is thrown.
1424    *
1425    * @throws X if {@code get} throws any checked exception except for an {@code
1426    *         ExecutionException} whose cause is not itself a checked exception
1427    * @throws UncheckedExecutionException if {@code get} throws an {@code
1428    *         ExecutionException} with a {@code RuntimeException} as its cause
1429    * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1430    *         with an {@code Error} as its cause
1431    * @throws CancellationException if {@code get} throws a {@code
1432    *         CancellationException}
1433    * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1434    *         RuntimeException} or does not have a suitable constructor
1435    * @since 10.0
1436    */
1437   public static <V, X extends Exception> V get(
1438       Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1439       throws X {
1440     checkNotNull(future);
1441     checkNotNull(unit);
1442     checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1443         "Futures.get exception type (%s) must not be a RuntimeException",
1444         exceptionClass);
1445     try {
1446       return future.get(timeout, unit);
1447     } catch (InterruptedException e) {
1448       currentThread().interrupt();
1449       throw newWithCause(exceptionClass, e);
1450     } catch (TimeoutException e) {
1451       throw newWithCause(exceptionClass, e);
1452     } catch (ExecutionException e) {
1453       wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1454       throw new AssertionError();
1455     }
1456   }
1457 
1458   private static <X extends Exception> void wrapAndThrowExceptionOrError(
1459       Throwable cause, Class<X> exceptionClass) throws X {
1460     if (cause instanceof Error) {
1461       throw new ExecutionError((Error) cause);
1462     }
1463     if (cause instanceof RuntimeException) {
1464       throw new UncheckedExecutionException(cause);
1465     }
1466     throw newWithCause(exceptionClass, cause);
1467   }
1468 
1469   /**
1470    * Returns the result of calling {@link Future#get()} uninterruptibly on a
1471    * task known not to throw a checked exception. This makes {@code Future} more
1472    * suitable for lightweight, fast-running tasks that, barring bugs in the
1473    * code, will not fail. This gives it exception-handling behavior similar to
1474    * that of {@code ForkJoinTask.join}.
1475    *
1476    * <p>Exceptions from {@code Future.get} are treated as follows:
1477    * <ul>
1478    * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1479    *     {@link UncheckedExecutionException} (if the cause is an {@code
1480    *     Exception}) or {@link ExecutionError} (if the cause is an {@code
1481    *     Error}).
1482    * <li>Any {@link InterruptedException} causes a retry of the {@code get}
1483    *     call. The interrupt is restored before {@code getUnchecked} returns.
1484    * <li>Any {@link CancellationException} is propagated untouched. So is any
1485    *     other {@link RuntimeException} ({@code get} implementations are
1486    *     discouraged from throwing such exceptions).
1487    * </ul>
1488    *
1489    * <p>The overall principle is to eliminate all checked exceptions: to loop to
1490    * avoid {@code InterruptedException}, to pass through {@code
1491    * CancellationException}, and to wrap any exception from the underlying
1492    * computation in an {@code UncheckedExecutionException} or {@code
1493    * ExecutionError}.
1494    *
1495    * <p>For an uninterruptible {@code get} that preserves other exceptions, see
1496    * {@link Uninterruptibles#getUninterruptibly(Future)}.
1497    *
1498    * @throws UncheckedExecutionException if {@code get} throws an {@code
1499    *         ExecutionException} with an {@code Exception} as its cause
1500    * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1501    *         with an {@code Error} as its cause
1502    * @throws CancellationException if {@code get} throws a {@code
1503    *         CancellationException}
1504    * @since 10.0
1505    */
1506   public static <V> V getUnchecked(Future<V> future) {
1507     checkNotNull(future);
1508     try {
1509       return getUninterruptibly(future);
1510     } catch (ExecutionException e) {
1511       wrapAndThrowUnchecked(e.getCause());
1512       throw new AssertionError();
1513     }
1514   }
1515 
1516   private static void wrapAndThrowUnchecked(Throwable cause) {
1517     if (cause instanceof Error) {
1518       throw new ExecutionError((Error) cause);
1519     }
1520     /*
1521      * It's a non-Error, non-Exception Throwable. From my survey of such
1522      * classes, I believe that most users intended to extend Exception, so we'll
1523      * treat it like an Exception.
1524      */
1525     throw new UncheckedExecutionException(cause);
1526   }
1527 
1528   /*
1529    * TODO(user): FutureChecker interface for these to be static methods on? If
1530    * so, refer to it in the (static-method) Futures.get documentation
1531    */
1532 
1533   /*
1534    * Arguably we don't need a timed getUnchecked because any operation slow
1535    * enough to require a timeout is heavyweight enough to throw a checked
1536    * exception and therefore be inappropriate to use with getUnchecked. Further,
1537    * it's not clear that converting the checked TimeoutException to a
1538    * RuntimeException -- especially to an UncheckedExecutionException, since it
1539    * wasn't thrown by the computation -- makes sense, and if we don't convert
1540    * it, the user still has to write a try-catch block.
1541    *
1542    * If you think you would use this method, let us know.
1543    */
1544 
1545   private static <X extends Exception> X newWithCause(
1546       Class<X> exceptionClass, Throwable cause) {
1547     // getConstructors() guarantees this as long as we don't modify the array.
1548     @SuppressWarnings("unchecked")
1549     List<Constructor<X>> constructors =
1550         (List) Arrays.asList(exceptionClass.getConstructors());
1551     for (Constructor<X> constructor : preferringStrings(constructors)) {
1552       @Nullable X instance = newFromConstructor(constructor, cause);
1553       if (instance != null) {
1554         if (instance.getCause() == null) {
1555           instance.initCause(cause);
1556         }
1557         return instance;
1558       }
1559     }
1560     throw new IllegalArgumentException(
1561         "No appropriate constructor for exception of type " + exceptionClass
1562             + " in response to chained exception", cause);
1563   }
1564 
1565   private static <X extends Exception> List<Constructor<X>>
1566       preferringStrings(List<Constructor<X>> constructors) {
1567     return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1568   }
1569 
1570   private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1571       Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1572         @Override public Boolean apply(Constructor<?> input) {
1573           return asList(input.getParameterTypes()).contains(String.class);
1574         }
1575       }).reverse();
1576 
1577   @Nullable private static <X> X newFromConstructor(
1578       Constructor<X> constructor, Throwable cause) {
1579     Class<?>[] paramTypes = constructor.getParameterTypes();
1580     Object[] params = new Object[paramTypes.length];
1581     for (int i = 0; i < paramTypes.length; i++) {
1582       Class<?> paramType = paramTypes[i];
1583       if (paramType.equals(String.class)) {
1584         params[i] = cause.toString();
1585       } else if (paramType.equals(Throwable.class)) {
1586         params[i] = cause;
1587       } else {
1588         return null;
1589       }
1590     }
1591     try {
1592       return constructor.newInstance(params);
1593     } catch (IllegalArgumentException e) {
1594       return null;
1595     } catch (InstantiationException e) {
1596       return null;
1597     } catch (IllegalAccessException e) {
1598       return null;
1599     } catch (InvocationTargetException e) {
1600       return null;
1601     }
1602   }
1603 
1604   private interface FutureCombiner<V, C> {
1605     C combine(List<Optional<V>> values);
1606   }
1607 
1608   private static class CombinedFuture<V, C> extends AbstractFuture<C> {
1609     private static final Logger logger =
1610         Logger.getLogger(CombinedFuture.class.getName());
1611 
1612     ImmutableCollection<? extends ListenableFuture<? extends V>> futures;
1613     final boolean allMustSucceed;
1614     final AtomicInteger remaining;
1615     FutureCombiner<V, C> combiner;
1616     List<Optional<V>> values;
1617     final Object seenExceptionsLock = new Object();
1618     Set<Throwable> seenExceptions;
1619 
1620     CombinedFuture(
1621         ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
1622         boolean allMustSucceed, Executor listenerExecutor,
1623         FutureCombiner<V, C> combiner) {
1624       this.futures = futures;
1625       this.allMustSucceed = allMustSucceed;
1626       this.remaining = new AtomicInteger(futures.size());
1627       this.combiner = combiner;
1628       this.values = Lists.newArrayListWithCapacity(futures.size());
1629       init(listenerExecutor);
1630     }
1631 
1632     /**
1633      * Must be called at the end of the constructor.
1634      */
1635     protected void init(final Executor listenerExecutor) {
1636       // First, schedule cleanup to execute when the Future is done.
1637       addListener(new Runnable() {
1638         @Override
1639         public void run() {
1640           // Cancel all the component futures.
1641           if (CombinedFuture.this.isCancelled()) {
1642             for (ListenableFuture<?> future : CombinedFuture.this.futures) {
1643               future.cancel(CombinedFuture.this.wasInterrupted());
1644             }
1645           }
1646 
1647           // Let go of the memory held by other futures
1648           CombinedFuture.this.futures = null;
1649 
1650           // By now the values array has either been set as the Future's value,
1651           // or (in case of failure) is no longer useful.
1652           CombinedFuture.this.values = null;
1653 
1654           // The combiner may also hold state, so free that as well
1655           CombinedFuture.this.combiner = null;
1656         }
1657       }, directExecutor());
1658 
1659       // Now begin the "real" initialization.
1660 
1661       // Corner case: List is empty.
1662       if (futures.isEmpty()) {
1663         set(combiner.combine(ImmutableList.<Optional<V>>of()));
1664         return;
1665       }
1666 
1667       // Populate the results list with null initially.
1668       for (int i = 0; i < futures.size(); ++i) {
1669         values.add(null);
1670       }
1671 
1672       // Register a listener on each Future in the list to update
1673       // the state of this future.
1674       // Note that if all the futures on the list are done prior to completing
1675       // this loop, the last call to addListener() will callback to
1676       // setOneValue(), transitively call our cleanup listener, and set
1677       // this.futures to null.
1678       // This is not actually a problem, since the foreach only needs
1679       // this.futures to be non-null at the beginning of the loop.
1680       int i = 0;
1681       for (final ListenableFuture<? extends V> listenable : futures) {
1682         final int index = i++;
1683         listenable.addListener(new Runnable() {
1684           @Override
1685           public void run() {
1686             setOneValue(index, listenable);
1687           }
1688         }, listenerExecutor);
1689       }
1690     }
1691 
1692     /**
1693      * Fails this future with the given Throwable if {@link #allMustSucceed} is
1694      * true. Also, logs the throwable if it is an {@link Error} or if
1695      * {@link #allMustSucceed} is {@code true}, the throwable did not cause
1696      * this future to fail, and it is the first time we've seen that particular Throwable.
1697      */
1698     private void setExceptionAndMaybeLog(Throwable throwable) {
1699       boolean visibleFromOutputFuture = false;
1700       boolean firstTimeSeeingThisException = true;
1701       if (allMustSucceed) {
1702         // As soon as the first one fails, throw the exception up.
1703         // The result of all other inputs is then ignored.
1704         visibleFromOutputFuture = super.setException(throwable);
1705 
1706         synchronized (seenExceptionsLock) {
1707           if (seenExceptions == null) {
1708             seenExceptions = Sets.newHashSet();
1709           }
1710           firstTimeSeeingThisException = seenExceptions.add(throwable);
1711         }
1712       }
1713 
1714       if (throwable instanceof Error
1715           || (allMustSucceed && !visibleFromOutputFuture && firstTimeSeeingThisException)) {
1716         logger.log(Level.SEVERE, "input future failed.", throwable);
1717       }
1718     }
1719 
1720     /**
1721      * Sets the value at the given index to that of the given future.
1722      */
1723     private void setOneValue(int index, Future<? extends V> future) {
1724       List<Optional<V>> localValues = values;
1725       // TODO(user): This check appears to be redundant since values is
1726       // assigned null only after the future completes.  However, values
1727       // is not volatile so it may be possible for us to observe the changes
1728       // to these two values in a different order... which I think is why
1729       // we need to check both.  Clear up this craziness either by making
1730       // values volatile or proving that it doesn't need to be for some other
1731       // reason.
1732       if (isDone() || localValues == null) {
1733         // Some other future failed or has been cancelled, causing this one to
1734         // also be cancelled or have an exception set. This should only happen
1735         // if allMustSucceed is true or if the output itself has been
1736         // cancelled.
1737         checkState(allMustSucceed || isCancelled(),
1738             "Future was done before all dependencies completed");
1739       }
1740 
1741       try {
1742         checkState(future.isDone(),
1743             "Tried to set value from future which is not done");
1744         V returnValue = getUninterruptibly(future);
1745         if (localValues != null) {
1746           localValues.set(index, Optional.fromNullable(returnValue));
1747         }
1748       } catch (CancellationException e) {
1749         if (allMustSucceed) {
1750           // Set ourselves as cancelled. Let the input futures keep running
1751           // as some of them may be used elsewhere.
1752           cancel(false);
1753         }
1754       } catch (ExecutionException e) {
1755         setExceptionAndMaybeLog(e.getCause());
1756       } catch (Throwable t) {
1757         setExceptionAndMaybeLog(t);
1758       } finally {
1759         int newRemaining = remaining.decrementAndGet();
1760         checkState(newRemaining >= 0, "Less than 0 remaining futures");
1761         if (newRemaining == 0) {
1762           FutureCombiner<V, C> localCombiner = combiner;
1763           if (localCombiner != null && localValues != null) {
1764             set(localCombiner.combine(localValues));
1765           } else {
1766             checkState(isDone());
1767           }
1768         }
1769       }
1770     }
1771   }
1772 
1773   /** Used for {@link #allAsList} and {@link #successfulAsList}. */
1774   private static <V> ListenableFuture<List<V>> listFuture(
1775       ImmutableList<ListenableFuture<? extends V>> futures,
1776       boolean allMustSucceed, Executor listenerExecutor) {
1777     return new CombinedFuture<V, List<V>>(
1778         futures, allMustSucceed, listenerExecutor,
1779         new FutureCombiner<V, List<V>>() {
1780           @Override
1781           public List<V> combine(List<Optional<V>> values) {
1782             List<V> result = Lists.newArrayList();
1783             for (Optional<V> element : values) {
1784               result.add(element != null ? element.orNull() : null);
1785             }
1786             return Collections.unmodifiableList(result);
1787           }
1788         });
1789   }
1790 
1791   /**
1792    * A checked future that uses a function to map from exceptions to the
1793    * appropriate checked type.
1794    */
1795   private static class MappingCheckedFuture<V, X extends Exception> extends
1796       AbstractCheckedFuture<V, X> {
1797 
1798     final Function<? super Exception, X> mapper;
1799 
1800     MappingCheckedFuture(ListenableFuture<V> delegate,
1801         Function<? super Exception, X> mapper) {
1802       super(delegate);
1803 
1804       this.mapper = checkNotNull(mapper);
1805     }
1806 
1807     @Override
1808     protected X mapException(Exception e) {
1809       return mapper.apply(e);
1810     }
1811   }
1812 }
1813