• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2006 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
23 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
24 import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
25 import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
26 import static java.lang.Thread.currentThread;
27 import static java.util.Arrays.asList;
28 import static java.util.concurrent.TimeUnit.NANOSECONDS;
29 
30 import com.google.common.annotations.Beta;
31 import com.google.common.base.Function;
32 import com.google.common.base.Preconditions;
33 import com.google.common.collect.ImmutableList;
34 import com.google.common.collect.Lists;
35 import com.google.common.collect.Ordering;
36 
37 import java.lang.reflect.Constructor;
38 import java.lang.reflect.InvocationTargetException;
39 import java.lang.reflect.UndeclaredThrowableException;
40 import java.util.Arrays;
41 import java.util.List;
42 import java.util.concurrent.BlockingQueue;
43 import java.util.concurrent.CancellationException;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.ExecutionException;
46 import java.util.concurrent.Executor;
47 import java.util.concurrent.Future;
48 import java.util.concurrent.LinkedBlockingQueue;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import java.util.concurrent.atomic.AtomicInteger;
52 
53 import javax.annotation.Nullable;
54 
55 /**
56  * Static utility methods pertaining to the {@link Future} interface.
57  *
58  * @author Kevin Bourrillion
59  * @author Nishant Thakkar
60  * @author Sven Mawson
61  * @since 1.0
62  */
63 @Beta
64 public final class Futures {
Futures()65   private Futures() {}
66 
67   /**
68    * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
69    * and a {@link Function} that maps from {@link Exception} instances into the
70    * appropriate checked type.
71    *
72    * <p>The given mapping function will be applied to an
73    * {@link InterruptedException}, a {@link CancellationException}, or an
74    * {@link ExecutionException} with the actual cause of the exception.
75    * See {@link Future#get()} for details on the exceptions thrown.
76    *
77    * @since 9.0 (source-compatible since 1.0)
78    */
makeChecked( ListenableFuture<V> future, Function<Exception, X> mapper)79   public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
80       ListenableFuture<V> future, Function<Exception, X> mapper) {
81     return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
82   }
83 
84   /**
85    * Creates a {@code ListenableFuture} which has its value set immediately upon
86    * construction. The getters just return the value. This {@code Future} can't
87    * be canceled or timed out and its {@code isDone()} method always returns
88    * {@code true}.
89    */
immediateFuture(@ullable V value)90   public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
91     SettableFuture<V> future = SettableFuture.create();
92     future.set(value);
93     return future;
94   }
95 
96   /**
97    * Returns a {@code CheckedFuture} which has its value set immediately upon
98    * construction.
99    *
100    * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
101    * method always returns {@code true}. Calling {@code get()} or {@code
102    * checkedGet()} will immediately return the provided value.
103    */
104   public static <V, X extends Exception> CheckedFuture<V, X>
immediateCheckedFuture(@ullable V value)105       immediateCheckedFuture(@Nullable V value) {
106     SettableFuture<V> future = SettableFuture.create();
107     future.set(value);
108     return Futures.makeChecked(future, new Function<Exception, X>() {
109       @Override
110       public X apply(Exception e) {
111         throw new AssertionError("impossible");
112       }
113     });
114   }
115 
116   /**
117    * Returns a {@code ListenableFuture} which has an exception set immediately
118    * upon construction.
119    *
120    * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
121    * method always returns {@code true}. Calling {@code get()} will immediately
122    * throw the provided {@code Throwable} wrapped in an {@code
123    * ExecutionException}.
124    *
125    * @throws Error if the throwable is an {@link Error}.
126    */
127   public static <V> ListenableFuture<V> immediateFailedFuture(
128       Throwable throwable) {
129     checkNotNull(throwable);
130     SettableFuture<V> future = SettableFuture.create();
131     future.setException(throwable);
132     return future;
133   }
134 
135   /**
136    * Returns a {@code CheckedFuture} which has an exception set immediately upon
137    * construction.
138    *
139    * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
140    * method always returns {@code true}. Calling {@code get()} will immediately
141    * throw the provided {@code Throwable} wrapped in an {@code
142    * ExecutionException}, and calling {@code checkedGet()} will throw the
143    * provided exception itself.
144    *
145    * @throws Error if the throwable is an {@link Error}.
146    */
147   public static <V, X extends Exception> CheckedFuture<V, X>
148       immediateFailedCheckedFuture(final X exception) {
149     checkNotNull(exception);
150     return makeChecked(Futures.<V>immediateFailedFuture(exception),
151         new Function<Exception, X>() {
152           @Override
153           public X apply(Exception e) {
154             return exception;
155           }
156         });
157   }
158 
159   /**
160    * <p>Returns a new {@code ListenableFuture} whose result is asynchronously
161    * derived from the result of the given {@code Future}. More precisely, the
162    * returned {@code Future} takes its result from a {@code Future} produced by
163    * applying the given {@code Function} to the result of the original {@code
164    * Future}. Example:
165    *
166    * <pre>   {@code
167    *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
168    *   Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
169    *       new Function<RowKey, ListenableFuture<QueryResult>>() {
170    *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
171    *           return dataService.read(rowKey);
172    *         }
173    *       };
174    *   ListenableFuture<QueryResult> queryFuture =
175    *       chain(rowKeyFuture, queryFunction);
176    * }</pre>
177    *
178    * <p>Note: This overload of {@code chain} is designed for cases in which the
179    * work of creating the derived future is fast and lightweight, as the method
180    * does not accept an {@code Executor} in which to perform the the work. For
181    * heavier derivations, this overload carries some caveats: First, the thread
182    * that the derivation runs in depends on whether the input {@code Future} is
183    * done at the time {@code chain} is called. In particular, if called late,
184    * {@code chain} will run the derivation in the thread that called {@code
185    * chain}.  Second, derivations may run in an internal thread of the system
186    * responsible for the input {@code Future}, such as an RPC network thread.
187    * Finally, during the execution of a {@code sameThreadExecutor} {@code
188    * chain} function, all other registered but unexecuted listeners are
189    * prevented from running, even if those listeners are to run in other
190    * executors.
191    *
192    * <p>The returned {@code Future} attempts to keep its cancellation state in
193    * sync with that of the input future and that of the future returned by the
194    * chain function. That is, if the returned {@code Future} is cancelled, it
195    * will attempt to cancel the other two, and if either of the other two is
196    * cancelled, the returned {@code Future} will receive a callback in which it
197    * will attempt to cancel itself.
198    *
199    * @param input The future to chain
200    * @param function A function to chain the results of the provided future
201    *     to the results of the returned future.  This will be run in the thread
202    *     that notifies input it is complete.
203    * @return A future that holds result of the chain.
204    * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and
205    *     use {@link #transform(ListenableFuture, AsyncFunction)}. This method is
206    *     scheduled to be removed from Guava in Guava release 12.0.
207    */
208   @Deprecated
209   public static <I, O> ListenableFuture<O> chain(
210       ListenableFuture<I> input,
211       Function<? super I, ? extends ListenableFuture<? extends O>> function) {
212     return chain(input, function, MoreExecutors.sameThreadExecutor());
213   }
214 
215   /**
216    * <p>Returns a new {@code ListenableFuture} whose result is asynchronously
217    * derived from the result of the given {@code Future}. More precisely, the
218    * returned {@code Future} takes its result from a {@code Future} produced by
219    * applying the given {@code Function} to the result of the original {@code
220    * Future}. Example:
221    *
222    * <pre>   {@code
223    *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
224    *   Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
225    *       new Function<RowKey, ListenableFuture<QueryResult>>() {
226    *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
227    *           return dataService.read(rowKey);
228    *         }
229    *       };
230    *   ListenableFuture<QueryResult> queryFuture =
231    *       chain(rowKeyFuture, queryFunction, executor);
232    * }</pre>
233    *
234    * <p>The returned {@code Future} attempts to keep its cancellation state in
235    * sync with that of the input future and that of the future returned by the
236    * chain function. That is, if the returned {@code Future} is cancelled, it
237    * will attempt to cancel the other two, and if either of the other two is
238    * cancelled, the returned {@code Future} will receive a callback in which it
239    * will attempt to cancel itself.
240    *
241    * <p>Note: For cases in which the work of creating the derived future is
242    * fast and lightweight, consider {@linkplain Futures#chain(ListenableFuture,
243    * Function) the other overload} or explicit use of {@code
244    * sameThreadExecutor}. For heavier derivations, this choice carries some
245    * caveats: First, the thread that the derivation runs in depends on whether
246    * the input {@code Future} is done at the time {@code chain} is called. In
247    * particular, if called late, {@code chain} will run the derivation in the
248    * thread that called {@code chain}. Second, derivations may run in an
249    * internal thread of the system responsible for the input {@code Future},
250    * such as an RPC network thread. Finally, during the execution of a {@code
251    * sameThreadExecutor} {@code chain} function, all other registered but
252    * unexecuted listeners are prevented from running, even if those listeners
253    * are to run in other executors.
254    *
255    * @param input The future to chain
256    * @param function A function to chain the results of the provided future
257    *     to the results of the returned future.
258    * @param executor Executor to run the function in.
259    * @return A future that holds result of the chain.
260    * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and
261    *     use {@link #transform(ListenableFuture, AsyncFunction, Executor)}. This
262    *     method is scheduled to be removed from Guava in Guava release 12.0.
263    */
264   @Deprecated
265   public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
266       final Function<? super I, ? extends ListenableFuture<? extends O>>
267           function,
268       Executor executor) {
269     checkNotNull(function);
270     ChainingListenableFuture<I, O> chain =
271         new ChainingListenableFuture<I, O>(new AsyncFunction<I, O>() {
272           @Override
273           /*
274            * All methods of ListenableFuture are covariant, and we don't expose
275            * the object anywhere that would allow it to be downcast.
276            */
277           @SuppressWarnings("unchecked")
278           public ListenableFuture<O> apply(I input) {
279             return (ListenableFuture) function.apply(input);
280           }
281         }, input);
282     input.addListener(chain, executor);
283     return chain;
284   }
285 
286   /**
287    * Returns a new {@code ListenableFuture} whose result is asynchronously
288    * derived from the result of the given {@code Future}. More precisely, the
289    * returned {@code Future} takes its result from a {@code Future} produced by
290    * applying the given {@code AsyncFunction} to the result of the original
291    * {@code Future}. Example:
292    *
293    * <pre>   {@code
294    *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
295    *   AsyncFunction<RowKey, QueryResult> queryFunction =
296    *       new AsyncFunction<RowKey, QueryResult>() {
297    *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
298    *           return dataService.read(rowKey);
299    *         }
300    *       };
301    *   ListenableFuture<QueryResult> queryFuture =
302    *       transform(rowKeyFuture, queryFunction);
303    * }</pre>
304    *
305    * <p>Note: This overload of {@code transform} is designed for cases in which
306    * the work of creating the derived {@code Future} is fast and lightweight,
307    * as the method does not accept an {@code Executor} in which to perform the
308    * the work. (The created {@code Future} itself need not complete quickly.)
309    * For heavier operations, this overload carries some caveats: First, the
310    * thread that {@code function.apply} runs in depends on whether the input
311    * {@code Future} is done at the time {@code transform} is called. In
312    * particular, if called late, {@code transform} will run the operation in
313    * the thread that called {@code transform}.  Second, {@code function.apply}
314    * may run in an internal thread of the system responsible for the input
315    * {@code Future}, such as an RPC network thread.  Finally, during the
316    * execution of a {@code sameThreadExecutor} {@code function.apply}, all
317    * other registered but unexecuted listeners are prevented from running, even
318    * if those listeners are to run in other executors.
319    *
320    * <p>The returned {@code Future} attempts to keep its cancellation state in
321    * sync with that of the input future and that of the future returned by the
322    * function. That is, if the returned {@code Future} is cancelled, it will
323    * attempt to cancel the other two, and if either of the other two is
324    * cancelled, the returned {@code Future} will receive a callback in which it
325    * will attempt to cancel itself.
326    *
327    * @param input The future to transform
328    * @param function A function to transform the result of the input future
329    *     to the result of the output future
330    * @return A future that holds result of the function (if the input succeeded)
331    *     or the original input's failure (if not)
332    * @since 11.0
333    */
334   public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
335       AsyncFunction<? super I, ? extends O> function) {
336     return transform(input, function, MoreExecutors.sameThreadExecutor());
337   }
338 
339   /**
340    * Returns a new {@code ListenableFuture} whose result is asynchronously
341    * derived from the result of the given {@code Future}. More precisely, the
342    * returned {@code Future} takes its result from a {@code Future} produced by
343    * applying the given {@code AsyncFunction} to the result of the original
344    * {@code Future}. Example:
345    *
346    * <pre>   {@code
347    *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
348    *   AsyncFunction<RowKey, QueryResult> queryFunction =
349    *       new AsyncFunction<RowKey, QueryResult>() {
350    *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
351    *           return dataService.read(rowKey);
352    *         }
353    *       };
354    *   ListenableFuture<QueryResult> queryFuture =
355    *       transform(rowKeyFuture, queryFunction, executor);
356    * }</pre>
357    *
358    * <p>The returned {@code Future} attempts to keep its cancellation state in
359    * sync with that of the input future and that of the future returned by the
360    * chain function. That is, if the returned {@code Future} is cancelled, it
361    * will attempt to cancel the other two, and if either of the other two is
362    * cancelled, the returned {@code Future} will receive a callback in which it
363    * will attempt to cancel itself.
364    *
365    * <p>Note: For cases in which the work of creating the derived future is
366    * fast and lightweight, consider {@linkplain
367    * Futures#transform(ListenableFuture, Function) the other overload} or
368    * explicit use of {@code sameThreadExecutor}. For heavier derivations, this
369    * choice carries some caveats: First, the thread that {@code function.apply}
370    * runs in depends on whether the input {@code Future} is done at the time
371    * {@code transform} is called. In particular, if called late, {@code
372    * transform} will run the operation in the thread that called {@code
373    * transform}.  Second, {@code function.apply} may run in an internal thread
374    * of the system responsible for the input {@code Future}, such as an RPC
375    * network thread.  Finally, during the execution of a {@code
376    * sameThreadExecutor} {@code function.apply}, all other registered but
377    * unexecuted listeners are prevented from running, even if those listeners
378    * are to run in other executors.
379    *
380    * @param input The future to transform
381    * @param function A function to transform the result of the input future
382    *     to the result of the output future
383    * @param executor Executor to run the function in.
384    * @return A future that holds result of the function (if the input succeeded)
385    *     or the original input's failure (if not)
386    * @since 11.0
387    */
388   public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
389       AsyncFunction<? super I, ? extends O> function,
390       Executor executor) {
391     ChainingListenableFuture<I, O> output =
392         new ChainingListenableFuture<I, O>(function, input);
393     input.addListener(output, executor);
394     return output;
395   }
396 
397   /**
398    * Returns a new {@code ListenableFuture} whose result is the product of
399    * applying the given {@code Function} to the result of the given {@code
400    * Future}. Example:
401    *
402    * <pre>   {@code
403    *   ListenableFuture<QueryResult> queryFuture = ...;
404    *   Function<QueryResult, List<Row>> rowsFunction =
405    *       new Function<QueryResult, List<Row>>() {
406    *         public List<Row> apply(QueryResult queryResult) {
407    *           return queryResult.getRows();
408    *         }
409    *       };
410    *   ListenableFuture<List<Row>> rowsFuture =
411    *       transform(queryFuture, rowsFunction);
412    * }</pre>
413    *
414    * <p>Note: This overload of {@code transform} is designed for cases in which
415    * the transformation is fast and lightweight, as the method does not accept
416    * an {@code Executor} in which to perform the the work. For heavier
417    * transformations, this overload carries some caveats: First, the thread
418    * that the transformation runs in depends on whether the input {@code
419    * Future} is done at the time {@code transform} is called. In particular, if
420    * called late, {@code transform} will perform the transformation in the
421    * thread that called {@code transform}. Second, transformations may run in
422    * an internal thread of the system responsible for the input {@code Future},
423    * such as an RPC network thread. Finally, during the execution of a {@code
424    * sameThreadExecutor} transformation, all other registered but unexecuted
425    * listeners are prevented from running, even if those listeners are to run
426    * in other executors.
427    *
428    * <p>The returned {@code Future} attempts to keep its cancellation state in
429    * sync with that of the input future. That is, if the returned {@code Future}
430    * is cancelled, it will attempt to cancel the input, and if the input is
431    * cancelled, the returned {@code Future} will receive a callback in which it
432    * will attempt to cancel itself.
433    *
434    * <p>An example use of this method is to convert a serializable object
435    * returned from an RPC into a POJO.
436    *
437    * @param future The future to transform
438    * @param function A Function to transform the results of the provided future
439    *     to the results of the returned future.  This will be run in the thread
440    *     that notifies input it is complete.
441    * @return A future that holds result of the transformation.
442    * @since 9.0 (in 1.0 as {@code compose})
443    */
444   public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
445       final Function<? super I, ? extends O> function) {
446     return transform(future, function, MoreExecutors.sameThreadExecutor());
447   }
448 
449   /**
450    * Returns a new {@code ListenableFuture} whose result is the product of
451    * applying the given {@code Function} to the result of the given {@code
452    * Future}. Example:
453    *
454    * <pre>   {@code
455    *   ListenableFuture<QueryResult> queryFuture = ...;
456    *   Function<QueryResult, List<Row>> rowsFunction =
457    *       new Function<QueryResult, List<Row>>() {
458    *         public List<Row> apply(QueryResult queryResult) {
459    *           return queryResult.getRows();
460    *         }
461    *       };
462    *   ListenableFuture<List<Row>> rowsFuture =
463    *       transform(queryFuture, rowsFunction, executor);
464    * }</pre>
465    *
466    * <p>The returned {@code Future} attempts to keep its cancellation state in
467    * sync with that of the input future. That is, if the returned {@code Future}
468    * is cancelled, it will attempt to cancel the input, and if the input is
469    * cancelled, the returned {@code Future} will receive a callback in which it
470    * will attempt to cancel itself.
471    *
472    * <p>An example use of this method is to convert a serializable object
473    * returned from an RPC into a POJO.
474    *
475    * <p>Note: For cases in which the transformation is fast and lightweight,
476    * consider {@linkplain Futures#transform(ListenableFuture, Function) the
477    * other overload} or explicit use of {@link
478    * MoreExecutors#sameThreadExecutor}. For heavier transformations, this
479    * choice carries some caveats: First, the thread that the transformation
480    * runs in depends on whether the input {@code Future} is done at the time
481    * {@code transform} is called. In particular, if called late, {@code
482    * transform} will perform the transformation in the thread that called
483    * {@code transform}.  Second, transformations may run in an internal thread
484    * of the system responsible for the input {@code Future}, such as an RPC
485    * network thread.  Finally, during the execution of a {@code
486    * sameThreadExecutor} transformation, all other registered but unexecuted
487    * listeners are prevented from running, even if those listeners are to run
488    * in other executors.
489    *
490    * @param future The future to transform
491    * @param function A Function to transform the results of the provided future
492    *     to the results of the returned future.
493    * @param executor Executor to run the function in.
494    * @return A future that holds result of the transformation.
495    * @since 9.0 (in 2.0 as {@code compose})
496    */
497   public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
498       final Function<? super I, ? extends O> function, Executor executor) {
499     checkNotNull(function);
500     Function<I, ListenableFuture<O>> wrapperFunction
501         = new Function<I, ListenableFuture<O>>() {
502             @Override public ListenableFuture<O> apply(I input) {
503               O output = function.apply(input);
504               return immediateFuture(output);
505             }
506         };
507     return chain(future, wrapperFunction, executor);
508   }
509 
510   /**
511    * Like {@link #transform(ListenableFuture, Function)} except that the
512    * transformation {@code function} is invoked on each call to
513    * {@link Future#get() get()} on the returned future.
514    *
515    * <p>The returned {@code Future} reflects the input's cancellation
516    * state directly, and any attempt to cancel the returned Future is likewise
517    * passed through to the input Future.
518    *
519    * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
520    * only apply the timeout to the execution of the underlying {@code Future},
521    * <em>not</em> to the execution of the transformation function.
522    *
523    * <p>The primary audience of this method is callers of {@code transform}
524    * who don't have a {@code ListenableFuture} available and
525    * do not mind repeated, lazy function evaluation.
526    *
527    * @param future The future to transform
528    * @param function A Function to transform the results of the provided future
529    *     to the results of the returned future.
530    * @return A future that returns the result of the transformation.
531    * @since 10.0
532    */
533   @Beta
534   public static <I, O> Future<O> lazyTransform(final Future<I> future,
535       final Function<? super I, ? extends O> function) {
536     checkNotNull(future);
537     checkNotNull(function);
538     return new Future<O>() {
539 
540       @Override
541       public boolean cancel(boolean mayInterruptIfRunning) {
542         return future.cancel(mayInterruptIfRunning);
543       }
544 
545       @Override
546       public boolean isCancelled() {
547         return future.isCancelled();
548       }
549 
550       @Override
551       public boolean isDone() {
552         return future.isDone();
553       }
554 
555       @Override
556       public O get() throws InterruptedException, ExecutionException {
557         return applyTransformation(future.get());
558       }
559 
560       @Override
561       public O get(long timeout, TimeUnit unit)
562           throws InterruptedException, ExecutionException, TimeoutException {
563         return applyTransformation(future.get(timeout, unit));
564       }
565 
566       private O applyTransformation(I input) throws ExecutionException {
567         try {
568           return function.apply(input);
569         } catch (Throwable t) {
570           throw new ExecutionException(t);
571         }
572       }
573     };
574   }
575 
576   /**
577    * An implementation of {@code ListenableFuture} that also implements
578    * {@code Runnable} so that it can be used to nest ListenableFutures.
579    * Once the passed-in {@code ListenableFuture} is complete, it calls the
580    * passed-in {@code Function} to generate the result.
581    *
582    * <p>If the function throws any checked exceptions, they should be wrapped
583    * in a {@code UndeclaredThrowableException} so that this class can get
584    * access to the cause.
585    */
586   private static class ChainingListenableFuture<I, O>
587       extends AbstractFuture<O> implements Runnable {
588 
589     private AsyncFunction<? super I, ? extends O> function;
590     private ListenableFuture<? extends I> inputFuture;
591     private volatile ListenableFuture<? extends O> outputFuture;
592     private final BlockingQueue<Boolean> mayInterruptIfRunningChannel =
593         new LinkedBlockingQueue<Boolean>(1);
594     private final CountDownLatch outputCreated = new CountDownLatch(1);
595 
596     private ChainingListenableFuture(
597         AsyncFunction<? super I, ? extends O> function,
598         ListenableFuture<? extends I> inputFuture) {
599       this.function = checkNotNull(function);
600       this.inputFuture = checkNotNull(inputFuture);
601     }
602 
603     /**
604      * Delegate the get() to the input and output futures, in case
605      * their implementations defer starting computation until their
606      * own get() is invoked.
607      */
608     @Override
609     public O get() throws InterruptedException, ExecutionException {
610       if (!isDone()) {
611         // Invoking get on the inputFuture will ensure our own run()
612         // method below is invoked as a listener when inputFuture sets
613         // its value.  Therefore when get() returns we should then see
614         // the outputFuture be created.
615         ListenableFuture<? extends I> inputFuture = this.inputFuture;
616         if (inputFuture != null) {
617           inputFuture.get();
618         }
619 
620         // If our listener was scheduled to run on an executor we may
621         // need to wait for our listener to finish running before the
622         // outputFuture has been constructed by the function.
623         outputCreated.await();
624 
625         // Like above with the inputFuture, we have a listener on
626         // the outputFuture that will set our own value when its
627         // value is set.  Invoking get will ensure the output can
628         // complete and invoke our listener, so that we can later
629         // get the result.
630         ListenableFuture<? extends O> outputFuture = this.outputFuture;
631         if (outputFuture != null) {
632           outputFuture.get();
633         }
634       }
635       return super.get();
636     }
637 
638     /**
639      * Delegate the get() to the input and output futures, in case
640      * their implementations defer starting computation until their
641      * own get() is invoked.
642      */
643     @Override
644     public O get(long timeout, TimeUnit unit) throws TimeoutException,
645         ExecutionException, InterruptedException {
646       if (!isDone()) {
647         // Use a single time unit so we can decrease remaining timeout
648         // as we wait for various phases to complete.
649         if (unit != NANOSECONDS) {
650           timeout = NANOSECONDS.convert(timeout, unit);
651           unit = NANOSECONDS;
652         }
653 
654         // Invoking get on the inputFuture will ensure our own run()
655         // method below is invoked as a listener when inputFuture sets
656         // its value.  Therefore when get() returns we should then see
657         // the outputFuture be created.
658         ListenableFuture<? extends I> inputFuture = this.inputFuture;
659         if (inputFuture != null) {
660           long start = System.nanoTime();
661           inputFuture.get(timeout, unit);
662           timeout -= Math.max(0, System.nanoTime() - start);
663         }
664 
665         // If our listener was scheduled to run on an executor we may
666         // need to wait for our listener to finish running before the
667         // outputFuture has been constructed by the function.
668         long start = System.nanoTime();
669         if (!outputCreated.await(timeout, unit)) {
670           throw new TimeoutException();
671         }
672         timeout -= Math.max(0, System.nanoTime() - start);
673 
674         // Like above with the inputFuture, we have a listener on
675         // the outputFuture that will set our own value when its
676         // value is set.  Invoking get will ensure the output can
677         // complete and invoke our listener, so that we can later
678         // get the result.
679         ListenableFuture<? extends O> outputFuture = this.outputFuture;
680         if (outputFuture != null) {
681           outputFuture.get(timeout, unit);
682         }
683       }
684       return super.get(timeout, unit);
685     }
686 
687     @Override
688     public boolean cancel(boolean mayInterruptIfRunning) {
689       /*
690        * Our additional cancellation work needs to occur even if
691        * !mayInterruptIfRunning, so we can't move it into interruptTask().
692        */
693       if (super.cancel(mayInterruptIfRunning)) {
694         // This should never block since only one thread is allowed to cancel
695         // this Future.
696         putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning);
697         cancel(inputFuture, mayInterruptIfRunning);
698         cancel(outputFuture, mayInterruptIfRunning);
699         return true;
700       }
701       return false;
702     }
703 
704     private void cancel(@Nullable Future<?> future,
705         boolean mayInterruptIfRunning) {
706       if (future != null) {
707         future.cancel(mayInterruptIfRunning);
708       }
709     }
710 
711     @Override
712     public void run() {
713       try {
714         I sourceResult;
715         try {
716           sourceResult = getUninterruptibly(inputFuture);
717         } catch (CancellationException e) {
718           // Cancel this future and return.
719           // At this point, inputFuture is cancelled and outputFuture doesn't
720           // exist, so the value of mayInterruptIfRunning is irrelevant.
721           cancel(false);
722           return;
723         } catch (ExecutionException e) {
724           // Set the cause of the exception as this future's exception
725           setException(e.getCause());
726           return;
727         }
728 
729         final ListenableFuture<? extends O> outputFuture = this.outputFuture =
730             function.apply(sourceResult);
731         if (isCancelled()) {
732           // Handles the case where cancel was called while the function was
733           // being applied.
734           // There is a gap in cancel(boolean) between calling sync.cancel()
735           // and storing the value of mayInterruptIfRunning, so this thread
736           // needs to block, waiting for that value.
737           outputFuture.cancel(
738               takeUninterruptibly(mayInterruptIfRunningChannel));
739           this.outputFuture = null;
740           return;
741         }
742         outputFuture.addListener(new Runnable() {
743             @Override
744             public void run() {
745               try {
746                 // Here it would have been nice to have had an
747                 // UninterruptibleListenableFuture, but we don't want to start a
748                 // combinatorial explosion of interfaces, so we have to make do.
749                 set(getUninterruptibly(outputFuture));
750               } catch (CancellationException e) {
751                 // Cancel this future and return.
752                 // At this point, inputFuture and outputFuture are done, so the
753                 // value of mayInterruptIfRunning is irrelevant.
754                 cancel(false);
755                 return;
756               } catch (ExecutionException e) {
757                 // Set the cause of the exception as this future's exception
758                 setException(e.getCause());
759               } finally {
760                 // Don't pin inputs beyond completion
761                 ChainingListenableFuture.this.outputFuture = null;
762               }
763             }
764           }, MoreExecutors.sameThreadExecutor());
765       } catch (UndeclaredThrowableException e) {
766         // Set the cause of the exception as this future's exception
767         setException(e.getCause());
768       } catch (Exception e) {
769         // This exception is irrelevant in this thread, but useful for the
770         // client
771         setException(e);
772       } catch (Error e) {
773         // Propagate errors up ASAP - our superclass will rethrow the error
774         setException(e);
775       } finally {
776         // Don't pin inputs beyond completion
777         function = null;
778         inputFuture = null;
779         // Allow our get routines to examine outputFuture now.
780         outputCreated.countDown();
781       }
782     }
783   }
784 
785   /**
786    * Creates a new {@code ListenableFuture} whose value is a list containing the
787    * values of all its input futures, if all succeed. If any input fails, the
788    * returned future fails.
789    *
790    * <p>The list of results is in the same order as the input list.
791    *
792    * <p>Canceling this future does not cancel any of the component futures;
793    * however, if any of the provided futures fails or is canceled, this one is,
794    * too.
795    *
796    * @param futures futures to combine
797    * @return a future that provides a list of the results of the component
798    *         futures
799    * @since 10.0
800    */
801   @Beta
802   public static <V> ListenableFuture<List<V>> allAsList(
803       ListenableFuture<? extends V>... futures) {
804     return new ListFuture<V>(ImmutableList.copyOf(futures), true,
805         MoreExecutors.sameThreadExecutor());
806   }
807 
808   /**
809    * Creates a new {@code ListenableFuture} whose value is a list containing the
810    * values of all its input futures, if all succeed. If any input fails, the
811    * returned future fails.
812    *
813    * <p>The list of results is in the same order as the input list.
814    *
815    * <p>Canceling this future does not cancel any of the component futures;
816    * however, if any of the provided futures fails or is canceled, this one is,
817    * too.
818    *
819    * @param futures futures to combine
820    * @return a future that provides a list of the results of the component
821    *         futures
822    * @since 10.0
823    */
824   @Beta
825   public static <V> ListenableFuture<List<V>> allAsList(
826       Iterable<? extends ListenableFuture<? extends V>> futures) {
827     return new ListFuture<V>(ImmutableList.copyOf(futures), true,
828         MoreExecutors.sameThreadExecutor());
829   }
830 
831   /**
832    * Creates a new {@code ListenableFuture} whose value is a list containing the
833    * values of all its successful input futures. The list of results is in the
834    * same order as the input list, and if any of the provided futures fails or
835    * is canceled, its corresponding position will contain {@code null} (which is
836    * indistinguishable from the future having a successful value of
837    * {@code null}).
838    *
839    * @param futures futures to combine
840    * @return a future that provides a list of the results of the component
841    *         futures
842    * @since 10.0
843    */
844   @Beta
845   public static <V> ListenableFuture<List<V>> successfulAsList(
846       ListenableFuture<? extends V>... futures) {
847     return new ListFuture<V>(ImmutableList.copyOf(futures), false,
848         MoreExecutors.sameThreadExecutor());
849   }
850 
851   /**
852    * Creates a new {@code ListenableFuture} whose value is a list containing the
853    * values of all its successful input futures. The list of results is in the
854    * same order as the input list, and if any of the provided futures fails or
855    * is canceled, its corresponding position will contain {@code null} (which is
856    * indistinguishable from the future having a successful value of
857    * {@code null}).
858    *
859    * @param futures futures to combine
860    * @return a future that provides a list of the results of the component
861    *         futures
862    * @since 10.0
863    */
864   @Beta
865   public static <V> ListenableFuture<List<V>> successfulAsList(
866       Iterable<? extends ListenableFuture<? extends V>> futures) {
867     return new ListFuture<V>(ImmutableList.copyOf(futures), false,
868         MoreExecutors.sameThreadExecutor());
869   }
870 
871   /**
872    * Registers separate success and failure callbacks to be run when the {@code
873    * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
874    * complete} or, if the computation is already complete, immediately.
875    *
876    * <p>There is no guaranteed ordering of execution of callbacks, but any
877    * callback added through this method is guaranteed to be called once the
878    * computation is complete.
879    *
880    * Example: <pre> {@code
881    * ListenableFuture<QueryResult> future = ...;
882    * addCallback(future,
883    *     new FutureCallback<QueryResult> {
884    *       public void onSuccess(QueryResult result) {
885    *         storeInCache(result);
886    *       }
887    *       public void onFailure(Throwable t) {
888    *         reportError(t);
889    *       }
890    *     });}</pre>
891    *
892    * <p>Note: This overload of {@code addCallback} is designed for cases in
893    * which the callack is fast and lightweight, as the method does not accept
894    * an {@code Executor} in which to perform the the work. For heavier
895    * callbacks, this overload carries some caveats: First, the thread that the
896    * callback runs in depends on whether the input {@code Future} is done at the
897    * time {@code addCallback} is called and on whether the input {@code Future}
898    * is ever cancelled. In particular, {@code addCallback} may execute the
899    * callback in the thread that calls {@code addCallback} or {@code
900    * Future.cancel}. Second, callbacks may run in an internal thread of the
901    * system responsible for the input {@code Future}, such as an RPC network
902    * thread. Finally, during the execution of a {@code sameThreadExecutor}
903    * callback, all other registered but unexecuted listeners are prevented from
904    * running, even if those listeners are to run in other executors.
905    *
906    * <p>For a more general interface to attach a completion listener to a
907    * {@code Future}, see {@link ListenableFuture#addListener addListener}.
908    *
909    * @param future The future attach the callback to.
910    * @param callback The callback to invoke when {@code future} is completed.
911    * @since 10.0
912    */
913   public static <V> void addCallback(ListenableFuture<V> future,
914       FutureCallback<? super V> callback) {
915     addCallback(future, callback, MoreExecutors.sameThreadExecutor());
916   }
917 
918   /**
919    * Registers separate success and failure callbacks to be run when the {@code
920    * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
921    * complete} or, if the computation is already complete, immediately.
922    *
923    * <p>The callback is run in {@code executor}.
924    * There is no guaranteed ordering of execution of callbacks, but any
925    * callback added through this method is guaranteed to be called once the
926    * computation is complete.
927    *
928    * Example: <pre> {@code
929    * ListenableFuture<QueryResult> future = ...;
930    * Executor e = ...
931    * addCallback(future, e,
932    *     new FutureCallback<QueryResult> {
933    *       public void onSuccess(QueryResult result) {
934    *         storeInCache(result);
935    *       }
936    *       public void onFailure(Throwable t) {
937    *         reportError(t);
938    *       }
939    *     });}</pre>
940    *
941    * When the callback is fast and lightweight consider {@linkplain
942    * Futures#addCallback(ListenableFuture, FutureCallback) the other overload}
943    * or explicit use of {@link MoreExecutors#sameThreadExecutor
944    * sameThreadExecutor}. For heavier callbacks, this choice carries some
945    * caveats: First, the thread that the callback runs in depends on whether
946    * the input {@code Future} is done at the time {@code addCallback} is called
947    * and on whether the input {@code Future} is ever cancelled. In particular,
948    * {@code addCallback} may execute the callback in the thread that calls
949    * {@code addCallback} or {@code Future.cancel}. Second, callbacks may run in
950    * an internal thread of the system responsible for the input {@code Future},
951    * such as an RPC network thread. Finally, during the execution of a {@code
952    * sameThreadExecutor} callback, all other registered but unexecuted
953    * listeners are prevented from running, even if those listeners are to run
954    * in other executors.
955    *
956    * <p>For a more general interface to attach a completion listener to a
957    * {@code Future}, see {@link ListenableFuture#addListener addListener}.
958    *
959    * @param future The future attach the callback to.
960    * @param callback The callback to invoke when {@code future} is completed.
961    * @param executor The executor to run {@code callback} when the future
962    *    completes.
963    * @since 10.0
964    */
965   public static <V> void addCallback(final ListenableFuture<V> future,
966       final FutureCallback<? super V> callback, Executor executor) {
967     Preconditions.checkNotNull(callback);
968     Runnable callbackListener = new Runnable() {
969       @Override
970       public void run() {
971         try {
972           // TODO(user): (Before Guava release), validate that this
973           // is the thing for IE.
974           V value = getUninterruptibly(future);
975           callback.onSuccess(value);
976         } catch (ExecutionException e) {
977           callback.onFailure(e.getCause());
978         } catch (RuntimeException e) {
979           callback.onFailure(e);
980         } catch (Error e) {
981           callback.onFailure(e);
982         }
983       }
984     };
985     future.addListener(callbackListener, executor);
986   }
987 
988   /**
989    * Returns the result of {@link Future#get()}, converting most exceptions to a
990    * new instance of the given checked exception type. This reduces boilerplate
991    * for a common use of {@code Future} in which it is unnecessary to
992    * programmatically distinguish between exception types or to extract other
993    * information from the exception instance.
994    *
995    * <p>Exceptions from {@code Future.get} are treated as follows:
996    * <ul>
997    * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
998    *     {@code X} if the cause is a checked exception, an {@link
999    *     UncheckedExecutionException} if the cause is a {@code
1000    *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1001    *     {@code Error}.
1002    * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1003    *     restoring the interrupt).
1004    * <li>Any {@link CancellationException} is propagated untouched, as is any
1005    *     other {@link RuntimeException} (though {@code get} implementations are
1006    *     discouraged from throwing such exceptions).
1007    * </ul>
1008    *
1009    * The overall principle is to continue to treat every checked exception as a
1010    * checked exception, every unchecked exception as an unchecked exception, and
1011    * every error as an error. In addition, the cause of any {@code
1012    * ExecutionException} is wrapped in order to ensure that the new stack trace
1013    * matches that of the current thread.
1014    *
1015    * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1016    * public constructor that accepts zero or more arguments, all of type {@code
1017    * String} or {@code Throwable} (preferring constructors with at least one
1018    * {@code String}) and calling the constructor via reflection. If the
1019    * exception did not already have a cause, one is set by calling {@link
1020    * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1021    * {@code IllegalArgumentException} is thrown.
1022    *
1023    * @throws X if {@code get} throws any checked exception except for an {@code
1024    *         ExecutionException} whose cause is not itself a checked exception
1025    * @throws UncheckedExecutionException if {@code get} throws an {@code
1026    *         ExecutionException} with a {@code RuntimeException} as its cause
1027    * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1028    *         with an {@code Error} as its cause
1029    * @throws CancellationException if {@code get} throws a {@code
1030    *         CancellationException}
1031    * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1032    *         RuntimeException} or does not have a suitable constructor
1033    * @since 10.0
1034    */
1035   @Beta
1036   public static <V, X extends Exception> V get(
1037       Future<V> future, Class<X> exceptionClass) throws X {
1038     checkNotNull(future);
1039     checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1040         "Futures.get exception type (%s) must not be a RuntimeException",
1041         exceptionClass);
1042     try {
1043       return future.get();
1044     } catch (InterruptedException e) {
1045       currentThread().interrupt();
1046       throw newWithCause(exceptionClass, e);
1047     } catch (ExecutionException e) {
1048       wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1049       throw new AssertionError();
1050     }
1051   }
1052 
1053   /**
1054    * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1055    * exceptions to a new instance of the given checked exception type. This
1056    * reduces boilerplate for a common use of {@code Future} in which it is
1057    * unnecessary to programmatically distinguish between exception types or to
1058    * extract other information from the exception instance.
1059    *
1060    * <p>Exceptions from {@code Future.get} are treated as follows:
1061    * <ul>
1062    * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1063    *     {@code X} if the cause is a checked exception, an {@link
1064    *     UncheckedExecutionException} if the cause is a {@code
1065    *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1066    *     {@code Error}.
1067    * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1068    *     restoring the interrupt).
1069    * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1070    * <li>Any {@link CancellationException} is propagated untouched, as is any
1071    *     other {@link RuntimeException} (though {@code get} implementations are
1072    *     discouraged from throwing such exceptions).
1073    * </ul>
1074    *
1075    * The overall principle is to continue to treat every checked exception as a
1076    * checked exception, every unchecked exception as an unchecked exception, and
1077    * every error as an error. In addition, the cause of any {@code
1078    * ExecutionException} is wrapped in order to ensure that the new stack trace
1079    * matches that of the current thread.
1080    *
1081    * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1082    * public constructor that accepts zero or more arguments, all of type {@code
1083    * String} or {@code Throwable} (preferring constructors with at least one
1084    * {@code String}) and calling the constructor via reflection. If the
1085    * exception did not already have a cause, one is set by calling {@link
1086    * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1087    * {@code IllegalArgumentException} is thrown.
1088    *
1089    * @throws X if {@code get} throws any checked exception except for an {@code
1090    *         ExecutionException} whose cause is not itself a checked exception
1091    * @throws UncheckedExecutionException if {@code get} throws an {@code
1092    *         ExecutionException} with a {@code RuntimeException} as its cause
1093    * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1094    *         with an {@code Error} as its cause
1095    * @throws CancellationException if {@code get} throws a {@code
1096    *         CancellationException}
1097    * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1098    *         RuntimeException} or does not have a suitable constructor
1099    * @since 10.0
1100    */
1101   @Beta
1102   public static <V, X extends Exception> V get(
1103       Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1104       throws X {
1105     checkNotNull(future);
1106     checkNotNull(unit);
1107     checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1108         "Futures.get exception type (%s) must not be a RuntimeException",
1109         exceptionClass);
1110     try {
1111       return future.get(timeout, unit);
1112     } catch (InterruptedException e) {
1113       currentThread().interrupt();
1114       throw newWithCause(exceptionClass, e);
1115     } catch (TimeoutException e) {
1116       throw newWithCause(exceptionClass, e);
1117     } catch (ExecutionException e) {
1118       wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1119       throw new AssertionError();
1120     }
1121   }
1122 
1123   private static <X extends Exception> void wrapAndThrowExceptionOrError(
1124       Throwable cause, Class<X> exceptionClass) throws X {
1125     if (cause instanceof Error) {
1126       throw new ExecutionError((Error) cause);
1127     }
1128     if (cause instanceof RuntimeException) {
1129       throw new UncheckedExecutionException(cause);
1130     }
1131     throw newWithCause(exceptionClass, cause);
1132   }
1133 
1134   /**
1135    * Returns the result of calling {@link Future#get()} uninterruptibly on a
1136    * task known not to throw a checked exception. This makes {@code Future} more
1137    * suitable for lightweight, fast-running tasks that, barring bugs in the
1138    * code, will not fail. This gives it exception-handling behavior similar to
1139    * that of {@code ForkJoinTask.join}.
1140    *
1141    * <p>Exceptions from {@code Future.get} are treated as follows:
1142    * <ul>
1143    * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1144    *     {@link UncheckedExecutionException} (if the cause is an {@code
1145    *     Exception}) or {@link ExecutionError} (if the cause is an {@code
1146    *     Error}).
1147    * <li>Any {@link InterruptedException} causes a retry of the {@code get}
1148    *     call. The interrupt is restored before {@code getUnchecked} returns.
1149    * <li>Any {@link CancellationException} is propagated untouched. So is any
1150    *     other {@link RuntimeException} ({@code get} implementations are
1151    *     discouraged from throwing such exceptions).
1152    * </ul>
1153    *
1154    * The overall principle is to eliminate all checked exceptions: to loop to
1155    * avoid {@code InterruptedException}, to pass through {@code
1156    * CancellationException}, and to wrap any exception from the underlying
1157    * computation in an {@code UncheckedExecutionException} or {@code
1158    * ExecutionError}.
1159    *
1160    * <p>For an uninterruptible {@code get} that preserves other exceptions, see
1161    * {@link Uninterruptibles#getUninterruptibly(Future)}.
1162    *
1163    * @throws UncheckedExecutionException if {@code get} throws an {@code
1164    *         ExecutionException} with an {@code Exception} as its cause
1165    * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1166    *         with an {@code Error} as its cause
1167    * @throws CancellationException if {@code get} throws a {@code
1168    *         CancellationException}
1169    * @since 10.0
1170    */
1171   @Beta
1172   public static <V> V getUnchecked(Future<V> future) {
1173     checkNotNull(future);
1174     try {
1175       return getUninterruptibly(future);
1176     } catch (ExecutionException e) {
1177       wrapAndThrowUnchecked(e.getCause());
1178       throw new AssertionError();
1179     }
1180   }
1181 
1182   private static void wrapAndThrowUnchecked(Throwable cause) {
1183     if (cause instanceof Error) {
1184       throw new ExecutionError((Error) cause);
1185     }
1186     /*
1187      * It's a non-Error, non-Exception Throwable. From my survey of such
1188      * classes, I believe that most users intended to extend Exception, so we'll
1189      * treat it like an Exception.
1190      */
1191     throw new UncheckedExecutionException(cause);
1192   }
1193 
1194   /*
1195    * TODO(user): FutureChecker interface for these to be static methods on? If
1196    * so, refer to it in the (static-method) Futures.get documentation
1197    */
1198 
1199   /*
1200    * Arguably we don't need a timed getUnchecked because any operation slow
1201    * enough to require a timeout is heavyweight enough to throw a checked
1202    * exception and therefore be inappropriate to use with getUnchecked. Further,
1203    * it's not clear that converting the checked TimeoutException to a
1204    * RuntimeException -- especially to an UncheckedExecutionException, since it
1205    * wasn't thrown by the computation -- makes sense, and if we don't convert
1206    * it, the user still has to write a try-catch block.
1207    *
1208    * If you think you would use this method, let us know.
1209    */
1210 
1211   private static <X extends Exception> X newWithCause(
1212       Class<X> exceptionClass, Throwable cause) {
1213     // getConstructors() guarantees this as long as we don't modify the array.
1214     @SuppressWarnings("unchecked")
1215     List<Constructor<X>> constructors =
1216         (List) Arrays.asList(exceptionClass.getConstructors());
1217     for (Constructor<X> constructor : preferringStrings(constructors)) {
1218       @Nullable X instance = newFromConstructor(constructor, cause);
1219       if (instance != null) {
1220         if (instance.getCause() == null) {
1221           instance.initCause(cause);
1222         }
1223         return instance;
1224       }
1225     }
1226     throw new IllegalArgumentException(
1227         "No appropriate constructor for exception of type " + exceptionClass
1228             + " in response to chained exception", cause);
1229   }
1230 
1231   private static <X extends Exception> List<Constructor<X>>
1232       preferringStrings(List<Constructor<X>> constructors) {
1233     return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1234   }
1235 
1236   private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1237       Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1238         @Override public Boolean apply(Constructor<?> input) {
1239           return asList(input.getParameterTypes()).contains(String.class);
1240         }
1241       }).reverse();
1242 
1243   @Nullable private static <X> X newFromConstructor(
1244       Constructor<X> constructor, Throwable cause) {
1245     Class<?>[] paramTypes = constructor.getParameterTypes();
1246     Object[] params = new Object[paramTypes.length];
1247     for (int i = 0; i < paramTypes.length; i++) {
1248       Class<?> paramType = paramTypes[i];
1249       if (paramType.equals(String.class)) {
1250         params[i] = cause.toString();
1251       } else if (paramType.equals(Throwable.class)) {
1252         params[i] = cause;
1253       } else {
1254         return null;
1255       }
1256     }
1257     try {
1258       return constructor.newInstance(params);
1259     } catch (IllegalArgumentException e) {
1260       return null;
1261     } catch (InstantiationException e) {
1262       return null;
1263     } catch (IllegalAccessException e) {
1264       return null;
1265     } catch (InvocationTargetException e) {
1266       return null;
1267     }
1268   }
1269 
1270   /**
1271    * Class that implements {@link #allAsList} and {@link #successfulAsList}.
1272    * The idea is to create a (null-filled) List and register a listener with
1273    * each component future to fill out the value in the List when that future
1274    * completes.
1275    */
1276   private static class ListFuture<V> extends AbstractFuture<List<V>> {
1277     ImmutableList<? extends ListenableFuture<? extends V>> futures;
1278     final boolean allMustSucceed;
1279     final AtomicInteger remaining;
1280     List<V> values;
1281 
1282     /**
1283      * Constructor.
1284      *
1285      * @param futures all the futures to build the list from
1286      * @param allMustSucceed whether a single failure or cancellation should
1287      *        propagate to this future
1288      * @param listenerExecutor used to run listeners on all the passed in
1289      *        futures.
1290      */
1291     ListFuture(
1292         final ImmutableList<? extends ListenableFuture<? extends V>> futures,
1293         final boolean allMustSucceed, final Executor listenerExecutor) {
1294       this.futures = futures;
1295       this.values = Lists.newArrayListWithCapacity(futures.size());
1296       this.allMustSucceed = allMustSucceed;
1297       this.remaining = new AtomicInteger(futures.size());
1298 
1299       init(listenerExecutor);
1300     }
1301 
1302     private void init(final Executor listenerExecutor) {
1303       // First, schedule cleanup to execute when the Future is done.
1304       addListener(new Runnable() {
1305         @Override
1306         public void run() {
1307           // By now the values array has either been set as the Future's value,
1308           // or (in case of failure) is no longer useful.
1309           ListFuture.this.values = null;
1310 
1311           // Let go of the memory held by other futures
1312           ListFuture.this.futures = null;
1313         }
1314       }, MoreExecutors.sameThreadExecutor());
1315 
1316       // Now begin the "real" initialization.
1317 
1318       // Corner case: List is empty.
1319       if (futures.isEmpty()) {
1320         set(Lists.newArrayList(values));
1321         return;
1322       }
1323 
1324       // Populate the results list with null initially.
1325       for (int i = 0; i < futures.size(); ++i) {
1326         values.add(null);
1327       }
1328 
1329       // Register a listener on each Future in the list to update
1330       // the state of this future.
1331       // Note that if all the futures on the list are done prior to completing
1332       // this loop, the last call to addListener() will callback to
1333       // setOneValue(), transitively call our cleanup listener, and set
1334       // this.futures to null.
1335       // We store a reference to futures to avoid the NPE.
1336       ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures;
1337       for (int i = 0; i < localFutures.size(); i++) {
1338         final ListenableFuture<? extends V> listenable = localFutures.get(i);
1339         final int index = i;
1340         listenable.addListener(new Runnable() {
1341           @Override
1342           public void run() {
1343             setOneValue(index, listenable);
1344           }
1345         }, listenerExecutor);
1346       }
1347     }
1348 
1349     /**
1350      * Sets the value at the given index to that of the given future.
1351      */
1352     private void setOneValue(int index, Future<? extends V> future) {
1353       List<V> localValues = values;
1354       if (isDone() || localValues == null) {
1355         // Some other future failed or has been cancelled, causing this one to
1356         // also be cancelled or have an exception set. This should only happen
1357         // if allMustSucceed is true.
1358         checkState(allMustSucceed,
1359             "Future was done before all dependencies completed");
1360         return;
1361       }
1362 
1363       try {
1364         checkState(future.isDone(),
1365             "Tried to set value from future which is not done");
1366         localValues.set(index, getUninterruptibly(future));
1367       } catch (CancellationException e) {
1368         if (allMustSucceed) {
1369           // Set ourselves as cancelled. Let the input futures keep running
1370           // as some of them may be used elsewhere.
1371           // (Currently we don't override interruptTask, so
1372           // mayInterruptIfRunning==false isn't technically necessary.)
1373           cancel(false);
1374         }
1375       } catch (ExecutionException e) {
1376         if (allMustSucceed) {
1377           // As soon as the first one fails, throw the exception up.
1378           // The result of all other inputs is then ignored.
1379           setException(e.getCause());
1380         }
1381       } catch (RuntimeException e) {
1382         if (allMustSucceed) {
1383           setException(e);
1384         }
1385       } catch (Error e) {
1386         // Propagate errors up ASAP - our superclass will rethrow the error
1387         setException(e);
1388       } finally {
1389         int newRemaining = remaining.decrementAndGet();
1390         checkState(newRemaining >= 0, "Less than 0 remaining futures");
1391         if (newRemaining == 0) {
1392           localValues = values;
1393           if (localValues != null) {
1394             set(Lists.newArrayList(localValues));
1395           } else {
1396             checkState(isDone());
1397           }
1398         }
1399       }
1400     }
1401 
1402     @Override
1403     public List<V> get() throws InterruptedException, ExecutionException {
1404       callAllGets();
1405 
1406       // This may still block in spite of the calls above, as the listeners may
1407       // be scheduled for execution in other threads.
1408       return super.get();
1409     }
1410 
1411     /**
1412      * Calls the get method of all dependency futures to work around a bug in
1413      * some ListenableFutures where the listeners aren't called until get() is
1414      * called.
1415      */
1416     private void callAllGets() throws InterruptedException {
1417       List<? extends ListenableFuture<? extends V>> oldFutures = futures;
1418       if (oldFutures != null && !isDone()) {
1419         for (ListenableFuture<? extends V> future : oldFutures) {
1420           // We wait for a little while for the future, but if it's not done,
1421           // we check that no other futures caused a cancellation or failure.
1422           // This can introduce a delay of up to 10ms in reporting an exception.
1423           while (!future.isDone()) {
1424             try {
1425               future.get();
1426             } catch (Error e) {
1427               throw e;
1428             } catch (InterruptedException e) {
1429               throw e;
1430             } catch (Throwable e) {
1431               // ExecutionException / CancellationException / RuntimeException
1432               if (allMustSucceed) {
1433                 return;
1434               } else {
1435                 continue;
1436               }
1437             }
1438           }
1439         }
1440       }
1441     }
1442   }
1443 
1444   /**
1445    * A checked future that uses a function to map from exceptions to the
1446    * appropriate checked type.
1447    */
1448   private static class MappingCheckedFuture<V, X extends Exception> extends
1449       AbstractCheckedFuture<V, X> {
1450 
1451     final Function<Exception, X> mapper;
1452 
1453     MappingCheckedFuture(ListenableFuture<V> delegate,
1454         Function<Exception, X> mapper) {
1455       super(delegate);
1456 
1457       this.mapper = checkNotNull(mapper);
1458     }
1459 
1460     @Override
1461     protected X mapException(Exception e) {
1462       return mapper.apply(e);
1463     }
1464   }
1465 }
1466