1 /* 2 * Copyright (C) 2006 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; 23 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 24 import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly; 25 import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly; 26 import static java.lang.Thread.currentThread; 27 import static java.util.Arrays.asList; 28 import static java.util.concurrent.TimeUnit.NANOSECONDS; 29 30 import com.google.common.annotations.Beta; 31 import com.google.common.base.Function; 32 import com.google.common.base.Preconditions; 33 import com.google.common.collect.ImmutableList; 34 import com.google.common.collect.Lists; 35 import com.google.common.collect.Ordering; 36 37 import java.lang.reflect.Constructor; 38 import java.lang.reflect.InvocationTargetException; 39 import java.lang.reflect.UndeclaredThrowableException; 40 import java.util.Arrays; 41 import java.util.List; 42 import java.util.concurrent.BlockingQueue; 43 import java.util.concurrent.CancellationException; 44 import java.util.concurrent.CountDownLatch; 45 import java.util.concurrent.ExecutionException; 46 import java.util.concurrent.Executor; 47 import java.util.concurrent.Future; 48 import java.util.concurrent.LinkedBlockingQueue; 49 import java.util.concurrent.TimeUnit; 50 import java.util.concurrent.TimeoutException; 51 import java.util.concurrent.atomic.AtomicInteger; 52 53 import javax.annotation.Nullable; 54 55 /** 56 * Static utility methods pertaining to the {@link Future} interface. 57 * 58 * @author Kevin Bourrillion 59 * @author Nishant Thakkar 60 * @author Sven Mawson 61 * @since 1.0 62 */ 63 @Beta 64 public final class Futures { Futures()65 private Futures() {} 66 67 /** 68 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} 69 * and a {@link Function} that maps from {@link Exception} instances into the 70 * appropriate checked type. 71 * 72 * <p>The given mapping function will be applied to an 73 * {@link InterruptedException}, a {@link CancellationException}, or an 74 * {@link ExecutionException} with the actual cause of the exception. 75 * See {@link Future#get()} for details on the exceptions thrown. 76 * 77 * @since 9.0 (source-compatible since 1.0) 78 */ makeChecked( ListenableFuture<V> future, Function<Exception, X> mapper)79 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 80 ListenableFuture<V> future, Function<Exception, X> mapper) { 81 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); 82 } 83 84 /** 85 * Creates a {@code ListenableFuture} which has its value set immediately upon 86 * construction. The getters just return the value. This {@code Future} can't 87 * be canceled or timed out and its {@code isDone()} method always returns 88 * {@code true}. 89 */ immediateFuture(@ullable V value)90 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 91 SettableFuture<V> future = SettableFuture.create(); 92 future.set(value); 93 return future; 94 } 95 96 /** 97 * Returns a {@code CheckedFuture} which has its value set immediately upon 98 * construction. 99 * 100 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 101 * method always returns {@code true}. Calling {@code get()} or {@code 102 * checkedGet()} will immediately return the provided value. 103 */ 104 public static <V, X extends Exception> CheckedFuture<V, X> immediateCheckedFuture(@ullable V value)105 immediateCheckedFuture(@Nullable V value) { 106 SettableFuture<V> future = SettableFuture.create(); 107 future.set(value); 108 return Futures.makeChecked(future, new Function<Exception, X>() { 109 @Override 110 public X apply(Exception e) { 111 throw new AssertionError("impossible"); 112 } 113 }); 114 } 115 116 /** 117 * Returns a {@code ListenableFuture} which has an exception set immediately 118 * upon construction. 119 * 120 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 121 * method always returns {@code true}. Calling {@code get()} will immediately 122 * throw the provided {@code Throwable} wrapped in an {@code 123 * ExecutionException}. 124 * 125 * @throws Error if the throwable is an {@link Error}. 126 */ 127 public static <V> ListenableFuture<V> immediateFailedFuture( 128 Throwable throwable) { 129 checkNotNull(throwable); 130 SettableFuture<V> future = SettableFuture.create(); 131 future.setException(throwable); 132 return future; 133 } 134 135 /** 136 * Returns a {@code CheckedFuture} which has an exception set immediately upon 137 * construction. 138 * 139 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 140 * method always returns {@code true}. Calling {@code get()} will immediately 141 * throw the provided {@code Throwable} wrapped in an {@code 142 * ExecutionException}, and calling {@code checkedGet()} will throw the 143 * provided exception itself. 144 * 145 * @throws Error if the throwable is an {@link Error}. 146 */ 147 public static <V, X extends Exception> CheckedFuture<V, X> 148 immediateFailedCheckedFuture(final X exception) { 149 checkNotNull(exception); 150 return makeChecked(Futures.<V>immediateFailedFuture(exception), 151 new Function<Exception, X>() { 152 @Override 153 public X apply(Exception e) { 154 return exception; 155 } 156 }); 157 } 158 159 /** 160 * <p>Returns a new {@code ListenableFuture} whose result is asynchronously 161 * derived from the result of the given {@code Future}. More precisely, the 162 * returned {@code Future} takes its result from a {@code Future} produced by 163 * applying the given {@code Function} to the result of the original {@code 164 * Future}. Example: 165 * 166 * <pre> {@code 167 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 168 * Function<RowKey, ListenableFuture<QueryResult>> queryFunction = 169 * new Function<RowKey, ListenableFuture<QueryResult>>() { 170 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 171 * return dataService.read(rowKey); 172 * } 173 * }; 174 * ListenableFuture<QueryResult> queryFuture = 175 * chain(rowKeyFuture, queryFunction); 176 * }</pre> 177 * 178 * <p>Note: This overload of {@code chain} is designed for cases in which the 179 * work of creating the derived future is fast and lightweight, as the method 180 * does not accept an {@code Executor} in which to perform the the work. For 181 * heavier derivations, this overload carries some caveats: First, the thread 182 * that the derivation runs in depends on whether the input {@code Future} is 183 * done at the time {@code chain} is called. In particular, if called late, 184 * {@code chain} will run the derivation in the thread that called {@code 185 * chain}. Second, derivations may run in an internal thread of the system 186 * responsible for the input {@code Future}, such as an RPC network thread. 187 * Finally, during the execution of a {@code sameThreadExecutor} {@code 188 * chain} function, all other registered but unexecuted listeners are 189 * prevented from running, even if those listeners are to run in other 190 * executors. 191 * 192 * <p>The returned {@code Future} attempts to keep its cancellation state in 193 * sync with that of the input future and that of the future returned by the 194 * chain function. That is, if the returned {@code Future} is cancelled, it 195 * will attempt to cancel the other two, and if either of the other two is 196 * cancelled, the returned {@code Future} will receive a callback in which it 197 * will attempt to cancel itself. 198 * 199 * @param input The future to chain 200 * @param function A function to chain the results of the provided future 201 * to the results of the returned future. This will be run in the thread 202 * that notifies input it is complete. 203 * @return A future that holds result of the chain. 204 * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and 205 * use {@link #transform(ListenableFuture, AsyncFunction)}. This method is 206 * scheduled to be removed from Guava in Guava release 12.0. 207 */ 208 @Deprecated 209 public static <I, O> ListenableFuture<O> chain( 210 ListenableFuture<I> input, 211 Function<? super I, ? extends ListenableFuture<? extends O>> function) { 212 return chain(input, function, MoreExecutors.sameThreadExecutor()); 213 } 214 215 /** 216 * <p>Returns a new {@code ListenableFuture} whose result is asynchronously 217 * derived from the result of the given {@code Future}. More precisely, the 218 * returned {@code Future} takes its result from a {@code Future} produced by 219 * applying the given {@code Function} to the result of the original {@code 220 * Future}. Example: 221 * 222 * <pre> {@code 223 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 224 * Function<RowKey, ListenableFuture<QueryResult>> queryFunction = 225 * new Function<RowKey, ListenableFuture<QueryResult>>() { 226 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 227 * return dataService.read(rowKey); 228 * } 229 * }; 230 * ListenableFuture<QueryResult> queryFuture = 231 * chain(rowKeyFuture, queryFunction, executor); 232 * }</pre> 233 * 234 * <p>The returned {@code Future} attempts to keep its cancellation state in 235 * sync with that of the input future and that of the future returned by the 236 * chain function. That is, if the returned {@code Future} is cancelled, it 237 * will attempt to cancel the other two, and if either of the other two is 238 * cancelled, the returned {@code Future} will receive a callback in which it 239 * will attempt to cancel itself. 240 * 241 * <p>Note: For cases in which the work of creating the derived future is 242 * fast and lightweight, consider {@linkplain Futures#chain(ListenableFuture, 243 * Function) the other overload} or explicit use of {@code 244 * sameThreadExecutor}. For heavier derivations, this choice carries some 245 * caveats: First, the thread that the derivation runs in depends on whether 246 * the input {@code Future} is done at the time {@code chain} is called. In 247 * particular, if called late, {@code chain} will run the derivation in the 248 * thread that called {@code chain}. Second, derivations may run in an 249 * internal thread of the system responsible for the input {@code Future}, 250 * such as an RPC network thread. Finally, during the execution of a {@code 251 * sameThreadExecutor} {@code chain} function, all other registered but 252 * unexecuted listeners are prevented from running, even if those listeners 253 * are to run in other executors. 254 * 255 * @param input The future to chain 256 * @param function A function to chain the results of the provided future 257 * to the results of the returned future. 258 * @param executor Executor to run the function in. 259 * @return A future that holds result of the chain. 260 * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and 261 * use {@link #transform(ListenableFuture, AsyncFunction, Executor)}. This 262 * method is scheduled to be removed from Guava in Guava release 12.0. 263 */ 264 @Deprecated 265 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input, 266 final Function<? super I, ? extends ListenableFuture<? extends O>> 267 function, 268 Executor executor) { 269 checkNotNull(function); 270 ChainingListenableFuture<I, O> chain = 271 new ChainingListenableFuture<I, O>(new AsyncFunction<I, O>() { 272 @Override 273 /* 274 * All methods of ListenableFuture are covariant, and we don't expose 275 * the object anywhere that would allow it to be downcast. 276 */ 277 @SuppressWarnings("unchecked") 278 public ListenableFuture<O> apply(I input) { 279 return (ListenableFuture) function.apply(input); 280 } 281 }, input); 282 input.addListener(chain, executor); 283 return chain; 284 } 285 286 /** 287 * Returns a new {@code ListenableFuture} whose result is asynchronously 288 * derived from the result of the given {@code Future}. More precisely, the 289 * returned {@code Future} takes its result from a {@code Future} produced by 290 * applying the given {@code AsyncFunction} to the result of the original 291 * {@code Future}. Example: 292 * 293 * <pre> {@code 294 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 295 * AsyncFunction<RowKey, QueryResult> queryFunction = 296 * new AsyncFunction<RowKey, QueryResult>() { 297 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 298 * return dataService.read(rowKey); 299 * } 300 * }; 301 * ListenableFuture<QueryResult> queryFuture = 302 * transform(rowKeyFuture, queryFunction); 303 * }</pre> 304 * 305 * <p>Note: This overload of {@code transform} is designed for cases in which 306 * the work of creating the derived {@code Future} is fast and lightweight, 307 * as the method does not accept an {@code Executor} in which to perform the 308 * the work. (The created {@code Future} itself need not complete quickly.) 309 * For heavier operations, this overload carries some caveats: First, the 310 * thread that {@code function.apply} runs in depends on whether the input 311 * {@code Future} is done at the time {@code transform} is called. In 312 * particular, if called late, {@code transform} will run the operation in 313 * the thread that called {@code transform}. Second, {@code function.apply} 314 * may run in an internal thread of the system responsible for the input 315 * {@code Future}, such as an RPC network thread. Finally, during the 316 * execution of a {@code sameThreadExecutor} {@code function.apply}, all 317 * other registered but unexecuted listeners are prevented from running, even 318 * if those listeners are to run in other executors. 319 * 320 * <p>The returned {@code Future} attempts to keep its cancellation state in 321 * sync with that of the input future and that of the future returned by the 322 * function. That is, if the returned {@code Future} is cancelled, it will 323 * attempt to cancel the other two, and if either of the other two is 324 * cancelled, the returned {@code Future} will receive a callback in which it 325 * will attempt to cancel itself. 326 * 327 * @param input The future to transform 328 * @param function A function to transform the result of the input future 329 * to the result of the output future 330 * @return A future that holds result of the function (if the input succeeded) 331 * or the original input's failure (if not) 332 * @since 11.0 333 */ 334 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 335 AsyncFunction<? super I, ? extends O> function) { 336 return transform(input, function, MoreExecutors.sameThreadExecutor()); 337 } 338 339 /** 340 * Returns a new {@code ListenableFuture} whose result is asynchronously 341 * derived from the result of the given {@code Future}. More precisely, the 342 * returned {@code Future} takes its result from a {@code Future} produced by 343 * applying the given {@code AsyncFunction} to the result of the original 344 * {@code Future}. Example: 345 * 346 * <pre> {@code 347 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 348 * AsyncFunction<RowKey, QueryResult> queryFunction = 349 * new AsyncFunction<RowKey, QueryResult>() { 350 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 351 * return dataService.read(rowKey); 352 * } 353 * }; 354 * ListenableFuture<QueryResult> queryFuture = 355 * transform(rowKeyFuture, queryFunction, executor); 356 * }</pre> 357 * 358 * <p>The returned {@code Future} attempts to keep its cancellation state in 359 * sync with that of the input future and that of the future returned by the 360 * chain function. That is, if the returned {@code Future} is cancelled, it 361 * will attempt to cancel the other two, and if either of the other two is 362 * cancelled, the returned {@code Future} will receive a callback in which it 363 * will attempt to cancel itself. 364 * 365 * <p>Note: For cases in which the work of creating the derived future is 366 * fast and lightweight, consider {@linkplain 367 * Futures#transform(ListenableFuture, Function) the other overload} or 368 * explicit use of {@code sameThreadExecutor}. For heavier derivations, this 369 * choice carries some caveats: First, the thread that {@code function.apply} 370 * runs in depends on whether the input {@code Future} is done at the time 371 * {@code transform} is called. In particular, if called late, {@code 372 * transform} will run the operation in the thread that called {@code 373 * transform}. Second, {@code function.apply} may run in an internal thread 374 * of the system responsible for the input {@code Future}, such as an RPC 375 * network thread. Finally, during the execution of a {@code 376 * sameThreadExecutor} {@code function.apply}, all other registered but 377 * unexecuted listeners are prevented from running, even if those listeners 378 * are to run in other executors. 379 * 380 * @param input The future to transform 381 * @param function A function to transform the result of the input future 382 * to the result of the output future 383 * @param executor Executor to run the function in. 384 * @return A future that holds result of the function (if the input succeeded) 385 * or the original input's failure (if not) 386 * @since 11.0 387 */ 388 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 389 AsyncFunction<? super I, ? extends O> function, 390 Executor executor) { 391 ChainingListenableFuture<I, O> output = 392 new ChainingListenableFuture<I, O>(function, input); 393 input.addListener(output, executor); 394 return output; 395 } 396 397 /** 398 * Returns a new {@code ListenableFuture} whose result is the product of 399 * applying the given {@code Function} to the result of the given {@code 400 * Future}. Example: 401 * 402 * <pre> {@code 403 * ListenableFuture<QueryResult> queryFuture = ...; 404 * Function<QueryResult, List<Row>> rowsFunction = 405 * new Function<QueryResult, List<Row>>() { 406 * public List<Row> apply(QueryResult queryResult) { 407 * return queryResult.getRows(); 408 * } 409 * }; 410 * ListenableFuture<List<Row>> rowsFuture = 411 * transform(queryFuture, rowsFunction); 412 * }</pre> 413 * 414 * <p>Note: This overload of {@code transform} is designed for cases in which 415 * the transformation is fast and lightweight, as the method does not accept 416 * an {@code Executor} in which to perform the the work. For heavier 417 * transformations, this overload carries some caveats: First, the thread 418 * that the transformation runs in depends on whether the input {@code 419 * Future} is done at the time {@code transform} is called. In particular, if 420 * called late, {@code transform} will perform the transformation in the 421 * thread that called {@code transform}. Second, transformations may run in 422 * an internal thread of the system responsible for the input {@code Future}, 423 * such as an RPC network thread. Finally, during the execution of a {@code 424 * sameThreadExecutor} transformation, all other registered but unexecuted 425 * listeners are prevented from running, even if those listeners are to run 426 * in other executors. 427 * 428 * <p>The returned {@code Future} attempts to keep its cancellation state in 429 * sync with that of the input future. That is, if the returned {@code Future} 430 * is cancelled, it will attempt to cancel the input, and if the input is 431 * cancelled, the returned {@code Future} will receive a callback in which it 432 * will attempt to cancel itself. 433 * 434 * <p>An example use of this method is to convert a serializable object 435 * returned from an RPC into a POJO. 436 * 437 * @param future The future to transform 438 * @param function A Function to transform the results of the provided future 439 * to the results of the returned future. This will be run in the thread 440 * that notifies input it is complete. 441 * @return A future that holds result of the transformation. 442 * @since 9.0 (in 1.0 as {@code compose}) 443 */ 444 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future, 445 final Function<? super I, ? extends O> function) { 446 return transform(future, function, MoreExecutors.sameThreadExecutor()); 447 } 448 449 /** 450 * Returns a new {@code ListenableFuture} whose result is the product of 451 * applying the given {@code Function} to the result of the given {@code 452 * Future}. Example: 453 * 454 * <pre> {@code 455 * ListenableFuture<QueryResult> queryFuture = ...; 456 * Function<QueryResult, List<Row>> rowsFunction = 457 * new Function<QueryResult, List<Row>>() { 458 * public List<Row> apply(QueryResult queryResult) { 459 * return queryResult.getRows(); 460 * } 461 * }; 462 * ListenableFuture<List<Row>> rowsFuture = 463 * transform(queryFuture, rowsFunction, executor); 464 * }</pre> 465 * 466 * <p>The returned {@code Future} attempts to keep its cancellation state in 467 * sync with that of the input future. That is, if the returned {@code Future} 468 * is cancelled, it will attempt to cancel the input, and if the input is 469 * cancelled, the returned {@code Future} will receive a callback in which it 470 * will attempt to cancel itself. 471 * 472 * <p>An example use of this method is to convert a serializable object 473 * returned from an RPC into a POJO. 474 * 475 * <p>Note: For cases in which the transformation is fast and lightweight, 476 * consider {@linkplain Futures#transform(ListenableFuture, Function) the 477 * other overload} or explicit use of {@link 478 * MoreExecutors#sameThreadExecutor}. For heavier transformations, this 479 * choice carries some caveats: First, the thread that the transformation 480 * runs in depends on whether the input {@code Future} is done at the time 481 * {@code transform} is called. In particular, if called late, {@code 482 * transform} will perform the transformation in the thread that called 483 * {@code transform}. Second, transformations may run in an internal thread 484 * of the system responsible for the input {@code Future}, such as an RPC 485 * network thread. Finally, during the execution of a {@code 486 * sameThreadExecutor} transformation, all other registered but unexecuted 487 * listeners are prevented from running, even if those listeners are to run 488 * in other executors. 489 * 490 * @param future The future to transform 491 * @param function A Function to transform the results of the provided future 492 * to the results of the returned future. 493 * @param executor Executor to run the function in. 494 * @return A future that holds result of the transformation. 495 * @since 9.0 (in 2.0 as {@code compose}) 496 */ 497 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future, 498 final Function<? super I, ? extends O> function, Executor executor) { 499 checkNotNull(function); 500 Function<I, ListenableFuture<O>> wrapperFunction 501 = new Function<I, ListenableFuture<O>>() { 502 @Override public ListenableFuture<O> apply(I input) { 503 O output = function.apply(input); 504 return immediateFuture(output); 505 } 506 }; 507 return chain(future, wrapperFunction, executor); 508 } 509 510 /** 511 * Like {@link #transform(ListenableFuture, Function)} except that the 512 * transformation {@code function} is invoked on each call to 513 * {@link Future#get() get()} on the returned future. 514 * 515 * <p>The returned {@code Future} reflects the input's cancellation 516 * state directly, and any attempt to cancel the returned Future is likewise 517 * passed through to the input Future. 518 * 519 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} 520 * only apply the timeout to the execution of the underlying {@code Future}, 521 * <em>not</em> to the execution of the transformation function. 522 * 523 * <p>The primary audience of this method is callers of {@code transform} 524 * who don't have a {@code ListenableFuture} available and 525 * do not mind repeated, lazy function evaluation. 526 * 527 * @param future The future to transform 528 * @param function A Function to transform the results of the provided future 529 * to the results of the returned future. 530 * @return A future that returns the result of the transformation. 531 * @since 10.0 532 */ 533 @Beta 534 public static <I, O> Future<O> lazyTransform(final Future<I> future, 535 final Function<? super I, ? extends O> function) { 536 checkNotNull(future); 537 checkNotNull(function); 538 return new Future<O>() { 539 540 @Override 541 public boolean cancel(boolean mayInterruptIfRunning) { 542 return future.cancel(mayInterruptIfRunning); 543 } 544 545 @Override 546 public boolean isCancelled() { 547 return future.isCancelled(); 548 } 549 550 @Override 551 public boolean isDone() { 552 return future.isDone(); 553 } 554 555 @Override 556 public O get() throws InterruptedException, ExecutionException { 557 return applyTransformation(future.get()); 558 } 559 560 @Override 561 public O get(long timeout, TimeUnit unit) 562 throws InterruptedException, ExecutionException, TimeoutException { 563 return applyTransformation(future.get(timeout, unit)); 564 } 565 566 private O applyTransformation(I input) throws ExecutionException { 567 try { 568 return function.apply(input); 569 } catch (Throwable t) { 570 throw new ExecutionException(t); 571 } 572 } 573 }; 574 } 575 576 /** 577 * An implementation of {@code ListenableFuture} that also implements 578 * {@code Runnable} so that it can be used to nest ListenableFutures. 579 * Once the passed-in {@code ListenableFuture} is complete, it calls the 580 * passed-in {@code Function} to generate the result. 581 * 582 * <p>If the function throws any checked exceptions, they should be wrapped 583 * in a {@code UndeclaredThrowableException} so that this class can get 584 * access to the cause. 585 */ 586 private static class ChainingListenableFuture<I, O> 587 extends AbstractFuture<O> implements Runnable { 588 589 private AsyncFunction<? super I, ? extends O> function; 590 private ListenableFuture<? extends I> inputFuture; 591 private volatile ListenableFuture<? extends O> outputFuture; 592 private final BlockingQueue<Boolean> mayInterruptIfRunningChannel = 593 new LinkedBlockingQueue<Boolean>(1); 594 private final CountDownLatch outputCreated = new CountDownLatch(1); 595 596 private ChainingListenableFuture( 597 AsyncFunction<? super I, ? extends O> function, 598 ListenableFuture<? extends I> inputFuture) { 599 this.function = checkNotNull(function); 600 this.inputFuture = checkNotNull(inputFuture); 601 } 602 603 /** 604 * Delegate the get() to the input and output futures, in case 605 * their implementations defer starting computation until their 606 * own get() is invoked. 607 */ 608 @Override 609 public O get() throws InterruptedException, ExecutionException { 610 if (!isDone()) { 611 // Invoking get on the inputFuture will ensure our own run() 612 // method below is invoked as a listener when inputFuture sets 613 // its value. Therefore when get() returns we should then see 614 // the outputFuture be created. 615 ListenableFuture<? extends I> inputFuture = this.inputFuture; 616 if (inputFuture != null) { 617 inputFuture.get(); 618 } 619 620 // If our listener was scheduled to run on an executor we may 621 // need to wait for our listener to finish running before the 622 // outputFuture has been constructed by the function. 623 outputCreated.await(); 624 625 // Like above with the inputFuture, we have a listener on 626 // the outputFuture that will set our own value when its 627 // value is set. Invoking get will ensure the output can 628 // complete and invoke our listener, so that we can later 629 // get the result. 630 ListenableFuture<? extends O> outputFuture = this.outputFuture; 631 if (outputFuture != null) { 632 outputFuture.get(); 633 } 634 } 635 return super.get(); 636 } 637 638 /** 639 * Delegate the get() to the input and output futures, in case 640 * their implementations defer starting computation until their 641 * own get() is invoked. 642 */ 643 @Override 644 public O get(long timeout, TimeUnit unit) throws TimeoutException, 645 ExecutionException, InterruptedException { 646 if (!isDone()) { 647 // Use a single time unit so we can decrease remaining timeout 648 // as we wait for various phases to complete. 649 if (unit != NANOSECONDS) { 650 timeout = NANOSECONDS.convert(timeout, unit); 651 unit = NANOSECONDS; 652 } 653 654 // Invoking get on the inputFuture will ensure our own run() 655 // method below is invoked as a listener when inputFuture sets 656 // its value. Therefore when get() returns we should then see 657 // the outputFuture be created. 658 ListenableFuture<? extends I> inputFuture = this.inputFuture; 659 if (inputFuture != null) { 660 long start = System.nanoTime(); 661 inputFuture.get(timeout, unit); 662 timeout -= Math.max(0, System.nanoTime() - start); 663 } 664 665 // If our listener was scheduled to run on an executor we may 666 // need to wait for our listener to finish running before the 667 // outputFuture has been constructed by the function. 668 long start = System.nanoTime(); 669 if (!outputCreated.await(timeout, unit)) { 670 throw new TimeoutException(); 671 } 672 timeout -= Math.max(0, System.nanoTime() - start); 673 674 // Like above with the inputFuture, we have a listener on 675 // the outputFuture that will set our own value when its 676 // value is set. Invoking get will ensure the output can 677 // complete and invoke our listener, so that we can later 678 // get the result. 679 ListenableFuture<? extends O> outputFuture = this.outputFuture; 680 if (outputFuture != null) { 681 outputFuture.get(timeout, unit); 682 } 683 } 684 return super.get(timeout, unit); 685 } 686 687 @Override 688 public boolean cancel(boolean mayInterruptIfRunning) { 689 /* 690 * Our additional cancellation work needs to occur even if 691 * !mayInterruptIfRunning, so we can't move it into interruptTask(). 692 */ 693 if (super.cancel(mayInterruptIfRunning)) { 694 // This should never block since only one thread is allowed to cancel 695 // this Future. 696 putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning); 697 cancel(inputFuture, mayInterruptIfRunning); 698 cancel(outputFuture, mayInterruptIfRunning); 699 return true; 700 } 701 return false; 702 } 703 704 private void cancel(@Nullable Future<?> future, 705 boolean mayInterruptIfRunning) { 706 if (future != null) { 707 future.cancel(mayInterruptIfRunning); 708 } 709 } 710 711 @Override 712 public void run() { 713 try { 714 I sourceResult; 715 try { 716 sourceResult = getUninterruptibly(inputFuture); 717 } catch (CancellationException e) { 718 // Cancel this future and return. 719 // At this point, inputFuture is cancelled and outputFuture doesn't 720 // exist, so the value of mayInterruptIfRunning is irrelevant. 721 cancel(false); 722 return; 723 } catch (ExecutionException e) { 724 // Set the cause of the exception as this future's exception 725 setException(e.getCause()); 726 return; 727 } 728 729 final ListenableFuture<? extends O> outputFuture = this.outputFuture = 730 function.apply(sourceResult); 731 if (isCancelled()) { 732 // Handles the case where cancel was called while the function was 733 // being applied. 734 // There is a gap in cancel(boolean) between calling sync.cancel() 735 // and storing the value of mayInterruptIfRunning, so this thread 736 // needs to block, waiting for that value. 737 outputFuture.cancel( 738 takeUninterruptibly(mayInterruptIfRunningChannel)); 739 this.outputFuture = null; 740 return; 741 } 742 outputFuture.addListener(new Runnable() { 743 @Override 744 public void run() { 745 try { 746 // Here it would have been nice to have had an 747 // UninterruptibleListenableFuture, but we don't want to start a 748 // combinatorial explosion of interfaces, so we have to make do. 749 set(getUninterruptibly(outputFuture)); 750 } catch (CancellationException e) { 751 // Cancel this future and return. 752 // At this point, inputFuture and outputFuture are done, so the 753 // value of mayInterruptIfRunning is irrelevant. 754 cancel(false); 755 return; 756 } catch (ExecutionException e) { 757 // Set the cause of the exception as this future's exception 758 setException(e.getCause()); 759 } finally { 760 // Don't pin inputs beyond completion 761 ChainingListenableFuture.this.outputFuture = null; 762 } 763 } 764 }, MoreExecutors.sameThreadExecutor()); 765 } catch (UndeclaredThrowableException e) { 766 // Set the cause of the exception as this future's exception 767 setException(e.getCause()); 768 } catch (Exception e) { 769 // This exception is irrelevant in this thread, but useful for the 770 // client 771 setException(e); 772 } catch (Error e) { 773 // Propagate errors up ASAP - our superclass will rethrow the error 774 setException(e); 775 } finally { 776 // Don't pin inputs beyond completion 777 function = null; 778 inputFuture = null; 779 // Allow our get routines to examine outputFuture now. 780 outputCreated.countDown(); 781 } 782 } 783 } 784 785 /** 786 * Creates a new {@code ListenableFuture} whose value is a list containing the 787 * values of all its input futures, if all succeed. If any input fails, the 788 * returned future fails. 789 * 790 * <p>The list of results is in the same order as the input list. 791 * 792 * <p>Canceling this future does not cancel any of the component futures; 793 * however, if any of the provided futures fails or is canceled, this one is, 794 * too. 795 * 796 * @param futures futures to combine 797 * @return a future that provides a list of the results of the component 798 * futures 799 * @since 10.0 800 */ 801 @Beta 802 public static <V> ListenableFuture<List<V>> allAsList( 803 ListenableFuture<? extends V>... futures) { 804 return new ListFuture<V>(ImmutableList.copyOf(futures), true, 805 MoreExecutors.sameThreadExecutor()); 806 } 807 808 /** 809 * Creates a new {@code ListenableFuture} whose value is a list containing the 810 * values of all its input futures, if all succeed. If any input fails, the 811 * returned future fails. 812 * 813 * <p>The list of results is in the same order as the input list. 814 * 815 * <p>Canceling this future does not cancel any of the component futures; 816 * however, if any of the provided futures fails or is canceled, this one is, 817 * too. 818 * 819 * @param futures futures to combine 820 * @return a future that provides a list of the results of the component 821 * futures 822 * @since 10.0 823 */ 824 @Beta 825 public static <V> ListenableFuture<List<V>> allAsList( 826 Iterable<? extends ListenableFuture<? extends V>> futures) { 827 return new ListFuture<V>(ImmutableList.copyOf(futures), true, 828 MoreExecutors.sameThreadExecutor()); 829 } 830 831 /** 832 * Creates a new {@code ListenableFuture} whose value is a list containing the 833 * values of all its successful input futures. The list of results is in the 834 * same order as the input list, and if any of the provided futures fails or 835 * is canceled, its corresponding position will contain {@code null} (which is 836 * indistinguishable from the future having a successful value of 837 * {@code null}). 838 * 839 * @param futures futures to combine 840 * @return a future that provides a list of the results of the component 841 * futures 842 * @since 10.0 843 */ 844 @Beta 845 public static <V> ListenableFuture<List<V>> successfulAsList( 846 ListenableFuture<? extends V>... futures) { 847 return new ListFuture<V>(ImmutableList.copyOf(futures), false, 848 MoreExecutors.sameThreadExecutor()); 849 } 850 851 /** 852 * Creates a new {@code ListenableFuture} whose value is a list containing the 853 * values of all its successful input futures. The list of results is in the 854 * same order as the input list, and if any of the provided futures fails or 855 * is canceled, its corresponding position will contain {@code null} (which is 856 * indistinguishable from the future having a successful value of 857 * {@code null}). 858 * 859 * @param futures futures to combine 860 * @return a future that provides a list of the results of the component 861 * futures 862 * @since 10.0 863 */ 864 @Beta 865 public static <V> ListenableFuture<List<V>> successfulAsList( 866 Iterable<? extends ListenableFuture<? extends V>> futures) { 867 return new ListFuture<V>(ImmutableList.copyOf(futures), false, 868 MoreExecutors.sameThreadExecutor()); 869 } 870 871 /** 872 * Registers separate success and failure callbacks to be run when the {@code 873 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 874 * complete} or, if the computation is already complete, immediately. 875 * 876 * <p>There is no guaranteed ordering of execution of callbacks, but any 877 * callback added through this method is guaranteed to be called once the 878 * computation is complete. 879 * 880 * Example: <pre> {@code 881 * ListenableFuture<QueryResult> future = ...; 882 * addCallback(future, 883 * new FutureCallback<QueryResult> { 884 * public void onSuccess(QueryResult result) { 885 * storeInCache(result); 886 * } 887 * public void onFailure(Throwable t) { 888 * reportError(t); 889 * } 890 * });}</pre> 891 * 892 * <p>Note: This overload of {@code addCallback} is designed for cases in 893 * which the callack is fast and lightweight, as the method does not accept 894 * an {@code Executor} in which to perform the the work. For heavier 895 * callbacks, this overload carries some caveats: First, the thread that the 896 * callback runs in depends on whether the input {@code Future} is done at the 897 * time {@code addCallback} is called and on whether the input {@code Future} 898 * is ever cancelled. In particular, {@code addCallback} may execute the 899 * callback in the thread that calls {@code addCallback} or {@code 900 * Future.cancel}. Second, callbacks may run in an internal thread of the 901 * system responsible for the input {@code Future}, such as an RPC network 902 * thread. Finally, during the execution of a {@code sameThreadExecutor} 903 * callback, all other registered but unexecuted listeners are prevented from 904 * running, even if those listeners are to run in other executors. 905 * 906 * <p>For a more general interface to attach a completion listener to a 907 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 908 * 909 * @param future The future attach the callback to. 910 * @param callback The callback to invoke when {@code future} is completed. 911 * @since 10.0 912 */ 913 public static <V> void addCallback(ListenableFuture<V> future, 914 FutureCallback<? super V> callback) { 915 addCallback(future, callback, MoreExecutors.sameThreadExecutor()); 916 } 917 918 /** 919 * Registers separate success and failure callbacks to be run when the {@code 920 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 921 * complete} or, if the computation is already complete, immediately. 922 * 923 * <p>The callback is run in {@code executor}. 924 * There is no guaranteed ordering of execution of callbacks, but any 925 * callback added through this method is guaranteed to be called once the 926 * computation is complete. 927 * 928 * Example: <pre> {@code 929 * ListenableFuture<QueryResult> future = ...; 930 * Executor e = ... 931 * addCallback(future, e, 932 * new FutureCallback<QueryResult> { 933 * public void onSuccess(QueryResult result) { 934 * storeInCache(result); 935 * } 936 * public void onFailure(Throwable t) { 937 * reportError(t); 938 * } 939 * });}</pre> 940 * 941 * When the callback is fast and lightweight consider {@linkplain 942 * Futures#addCallback(ListenableFuture, FutureCallback) the other overload} 943 * or explicit use of {@link MoreExecutors#sameThreadExecutor 944 * sameThreadExecutor}. For heavier callbacks, this choice carries some 945 * caveats: First, the thread that the callback runs in depends on whether 946 * the input {@code Future} is done at the time {@code addCallback} is called 947 * and on whether the input {@code Future} is ever cancelled. In particular, 948 * {@code addCallback} may execute the callback in the thread that calls 949 * {@code addCallback} or {@code Future.cancel}. Second, callbacks may run in 950 * an internal thread of the system responsible for the input {@code Future}, 951 * such as an RPC network thread. Finally, during the execution of a {@code 952 * sameThreadExecutor} callback, all other registered but unexecuted 953 * listeners are prevented from running, even if those listeners are to run 954 * in other executors. 955 * 956 * <p>For a more general interface to attach a completion listener to a 957 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 958 * 959 * @param future The future attach the callback to. 960 * @param callback The callback to invoke when {@code future} is completed. 961 * @param executor The executor to run {@code callback} when the future 962 * completes. 963 * @since 10.0 964 */ 965 public static <V> void addCallback(final ListenableFuture<V> future, 966 final FutureCallback<? super V> callback, Executor executor) { 967 Preconditions.checkNotNull(callback); 968 Runnable callbackListener = new Runnable() { 969 @Override 970 public void run() { 971 try { 972 // TODO(user): (Before Guava release), validate that this 973 // is the thing for IE. 974 V value = getUninterruptibly(future); 975 callback.onSuccess(value); 976 } catch (ExecutionException e) { 977 callback.onFailure(e.getCause()); 978 } catch (RuntimeException e) { 979 callback.onFailure(e); 980 } catch (Error e) { 981 callback.onFailure(e); 982 } 983 } 984 }; 985 future.addListener(callbackListener, executor); 986 } 987 988 /** 989 * Returns the result of {@link Future#get()}, converting most exceptions to a 990 * new instance of the given checked exception type. This reduces boilerplate 991 * for a common use of {@code Future} in which it is unnecessary to 992 * programmatically distinguish between exception types or to extract other 993 * information from the exception instance. 994 * 995 * <p>Exceptions from {@code Future.get} are treated as follows: 996 * <ul> 997 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 998 * {@code X} if the cause is a checked exception, an {@link 999 * UncheckedExecutionException} if the cause is a {@code 1000 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1001 * {@code Error}. 1002 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1003 * restoring the interrupt). 1004 * <li>Any {@link CancellationException} is propagated untouched, as is any 1005 * other {@link RuntimeException} (though {@code get} implementations are 1006 * discouraged from throwing such exceptions). 1007 * </ul> 1008 * 1009 * The overall principle is to continue to treat every checked exception as a 1010 * checked exception, every unchecked exception as an unchecked exception, and 1011 * every error as an error. In addition, the cause of any {@code 1012 * ExecutionException} is wrapped in order to ensure that the new stack trace 1013 * matches that of the current thread. 1014 * 1015 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1016 * public constructor that accepts zero or more arguments, all of type {@code 1017 * String} or {@code Throwable} (preferring constructors with at least one 1018 * {@code String}) and calling the constructor via reflection. If the 1019 * exception did not already have a cause, one is set by calling {@link 1020 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1021 * {@code IllegalArgumentException} is thrown. 1022 * 1023 * @throws X if {@code get} throws any checked exception except for an {@code 1024 * ExecutionException} whose cause is not itself a checked exception 1025 * @throws UncheckedExecutionException if {@code get} throws an {@code 1026 * ExecutionException} with a {@code RuntimeException} as its cause 1027 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1028 * with an {@code Error} as its cause 1029 * @throws CancellationException if {@code get} throws a {@code 1030 * CancellationException} 1031 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1032 * RuntimeException} or does not have a suitable constructor 1033 * @since 10.0 1034 */ 1035 @Beta 1036 public static <V, X extends Exception> V get( 1037 Future<V> future, Class<X> exceptionClass) throws X { 1038 checkNotNull(future); 1039 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1040 "Futures.get exception type (%s) must not be a RuntimeException", 1041 exceptionClass); 1042 try { 1043 return future.get(); 1044 } catch (InterruptedException e) { 1045 currentThread().interrupt(); 1046 throw newWithCause(exceptionClass, e); 1047 } catch (ExecutionException e) { 1048 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1049 throw new AssertionError(); 1050 } 1051 } 1052 1053 /** 1054 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most 1055 * exceptions to a new instance of the given checked exception type. This 1056 * reduces boilerplate for a common use of {@code Future} in which it is 1057 * unnecessary to programmatically distinguish between exception types or to 1058 * extract other information from the exception instance. 1059 * 1060 * <p>Exceptions from {@code Future.get} are treated as follows: 1061 * <ul> 1062 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1063 * {@code X} if the cause is a checked exception, an {@link 1064 * UncheckedExecutionException} if the cause is a {@code 1065 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1066 * {@code Error}. 1067 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1068 * restoring the interrupt). 1069 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1070 * <li>Any {@link CancellationException} is propagated untouched, as is any 1071 * other {@link RuntimeException} (though {@code get} implementations are 1072 * discouraged from throwing such exceptions). 1073 * </ul> 1074 * 1075 * The overall principle is to continue to treat every checked exception as a 1076 * checked exception, every unchecked exception as an unchecked exception, and 1077 * every error as an error. In addition, the cause of any {@code 1078 * ExecutionException} is wrapped in order to ensure that the new stack trace 1079 * matches that of the current thread. 1080 * 1081 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1082 * public constructor that accepts zero or more arguments, all of type {@code 1083 * String} or {@code Throwable} (preferring constructors with at least one 1084 * {@code String}) and calling the constructor via reflection. If the 1085 * exception did not already have a cause, one is set by calling {@link 1086 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1087 * {@code IllegalArgumentException} is thrown. 1088 * 1089 * @throws X if {@code get} throws any checked exception except for an {@code 1090 * ExecutionException} whose cause is not itself a checked exception 1091 * @throws UncheckedExecutionException if {@code get} throws an {@code 1092 * ExecutionException} with a {@code RuntimeException} as its cause 1093 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1094 * with an {@code Error} as its cause 1095 * @throws CancellationException if {@code get} throws a {@code 1096 * CancellationException} 1097 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1098 * RuntimeException} or does not have a suitable constructor 1099 * @since 10.0 1100 */ 1101 @Beta 1102 public static <V, X extends Exception> V get( 1103 Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass) 1104 throws X { 1105 checkNotNull(future); 1106 checkNotNull(unit); 1107 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1108 "Futures.get exception type (%s) must not be a RuntimeException", 1109 exceptionClass); 1110 try { 1111 return future.get(timeout, unit); 1112 } catch (InterruptedException e) { 1113 currentThread().interrupt(); 1114 throw newWithCause(exceptionClass, e); 1115 } catch (TimeoutException e) { 1116 throw newWithCause(exceptionClass, e); 1117 } catch (ExecutionException e) { 1118 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1119 throw new AssertionError(); 1120 } 1121 } 1122 1123 private static <X extends Exception> void wrapAndThrowExceptionOrError( 1124 Throwable cause, Class<X> exceptionClass) throws X { 1125 if (cause instanceof Error) { 1126 throw new ExecutionError((Error) cause); 1127 } 1128 if (cause instanceof RuntimeException) { 1129 throw new UncheckedExecutionException(cause); 1130 } 1131 throw newWithCause(exceptionClass, cause); 1132 } 1133 1134 /** 1135 * Returns the result of calling {@link Future#get()} uninterruptibly on a 1136 * task known not to throw a checked exception. This makes {@code Future} more 1137 * suitable for lightweight, fast-running tasks that, barring bugs in the 1138 * code, will not fail. This gives it exception-handling behavior similar to 1139 * that of {@code ForkJoinTask.join}. 1140 * 1141 * <p>Exceptions from {@code Future.get} are treated as follows: 1142 * <ul> 1143 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1144 * {@link UncheckedExecutionException} (if the cause is an {@code 1145 * Exception}) or {@link ExecutionError} (if the cause is an {@code 1146 * Error}). 1147 * <li>Any {@link InterruptedException} causes a retry of the {@code get} 1148 * call. The interrupt is restored before {@code getUnchecked} returns. 1149 * <li>Any {@link CancellationException} is propagated untouched. So is any 1150 * other {@link RuntimeException} ({@code get} implementations are 1151 * discouraged from throwing such exceptions). 1152 * </ul> 1153 * 1154 * The overall principle is to eliminate all checked exceptions: to loop to 1155 * avoid {@code InterruptedException}, to pass through {@code 1156 * CancellationException}, and to wrap any exception from the underlying 1157 * computation in an {@code UncheckedExecutionException} or {@code 1158 * ExecutionError}. 1159 * 1160 * <p>For an uninterruptible {@code get} that preserves other exceptions, see 1161 * {@link Uninterruptibles#getUninterruptibly(Future)}. 1162 * 1163 * @throws UncheckedExecutionException if {@code get} throws an {@code 1164 * ExecutionException} with an {@code Exception} as its cause 1165 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1166 * with an {@code Error} as its cause 1167 * @throws CancellationException if {@code get} throws a {@code 1168 * CancellationException} 1169 * @since 10.0 1170 */ 1171 @Beta 1172 public static <V> V getUnchecked(Future<V> future) { 1173 checkNotNull(future); 1174 try { 1175 return getUninterruptibly(future); 1176 } catch (ExecutionException e) { 1177 wrapAndThrowUnchecked(e.getCause()); 1178 throw new AssertionError(); 1179 } 1180 } 1181 1182 private static void wrapAndThrowUnchecked(Throwable cause) { 1183 if (cause instanceof Error) { 1184 throw new ExecutionError((Error) cause); 1185 } 1186 /* 1187 * It's a non-Error, non-Exception Throwable. From my survey of such 1188 * classes, I believe that most users intended to extend Exception, so we'll 1189 * treat it like an Exception. 1190 */ 1191 throw new UncheckedExecutionException(cause); 1192 } 1193 1194 /* 1195 * TODO(user): FutureChecker interface for these to be static methods on? If 1196 * so, refer to it in the (static-method) Futures.get documentation 1197 */ 1198 1199 /* 1200 * Arguably we don't need a timed getUnchecked because any operation slow 1201 * enough to require a timeout is heavyweight enough to throw a checked 1202 * exception and therefore be inappropriate to use with getUnchecked. Further, 1203 * it's not clear that converting the checked TimeoutException to a 1204 * RuntimeException -- especially to an UncheckedExecutionException, since it 1205 * wasn't thrown by the computation -- makes sense, and if we don't convert 1206 * it, the user still has to write a try-catch block. 1207 * 1208 * If you think you would use this method, let us know. 1209 */ 1210 1211 private static <X extends Exception> X newWithCause( 1212 Class<X> exceptionClass, Throwable cause) { 1213 // getConstructors() guarantees this as long as we don't modify the array. 1214 @SuppressWarnings("unchecked") 1215 List<Constructor<X>> constructors = 1216 (List) Arrays.asList(exceptionClass.getConstructors()); 1217 for (Constructor<X> constructor : preferringStrings(constructors)) { 1218 @Nullable X instance = newFromConstructor(constructor, cause); 1219 if (instance != null) { 1220 if (instance.getCause() == null) { 1221 instance.initCause(cause); 1222 } 1223 return instance; 1224 } 1225 } 1226 throw new IllegalArgumentException( 1227 "No appropriate constructor for exception of type " + exceptionClass 1228 + " in response to chained exception", cause); 1229 } 1230 1231 private static <X extends Exception> List<Constructor<X>> 1232 preferringStrings(List<Constructor<X>> constructors) { 1233 return WITH_STRING_PARAM_FIRST.sortedCopy(constructors); 1234 } 1235 1236 private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST = 1237 Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() { 1238 @Override public Boolean apply(Constructor<?> input) { 1239 return asList(input.getParameterTypes()).contains(String.class); 1240 } 1241 }).reverse(); 1242 1243 @Nullable private static <X> X newFromConstructor( 1244 Constructor<X> constructor, Throwable cause) { 1245 Class<?>[] paramTypes = constructor.getParameterTypes(); 1246 Object[] params = new Object[paramTypes.length]; 1247 for (int i = 0; i < paramTypes.length; i++) { 1248 Class<?> paramType = paramTypes[i]; 1249 if (paramType.equals(String.class)) { 1250 params[i] = cause.toString(); 1251 } else if (paramType.equals(Throwable.class)) { 1252 params[i] = cause; 1253 } else { 1254 return null; 1255 } 1256 } 1257 try { 1258 return constructor.newInstance(params); 1259 } catch (IllegalArgumentException e) { 1260 return null; 1261 } catch (InstantiationException e) { 1262 return null; 1263 } catch (IllegalAccessException e) { 1264 return null; 1265 } catch (InvocationTargetException e) { 1266 return null; 1267 } 1268 } 1269 1270 /** 1271 * Class that implements {@link #allAsList} and {@link #successfulAsList}. 1272 * The idea is to create a (null-filled) List and register a listener with 1273 * each component future to fill out the value in the List when that future 1274 * completes. 1275 */ 1276 private static class ListFuture<V> extends AbstractFuture<List<V>> { 1277 ImmutableList<? extends ListenableFuture<? extends V>> futures; 1278 final boolean allMustSucceed; 1279 final AtomicInteger remaining; 1280 List<V> values; 1281 1282 /** 1283 * Constructor. 1284 * 1285 * @param futures all the futures to build the list from 1286 * @param allMustSucceed whether a single failure or cancellation should 1287 * propagate to this future 1288 * @param listenerExecutor used to run listeners on all the passed in 1289 * futures. 1290 */ 1291 ListFuture( 1292 final ImmutableList<? extends ListenableFuture<? extends V>> futures, 1293 final boolean allMustSucceed, final Executor listenerExecutor) { 1294 this.futures = futures; 1295 this.values = Lists.newArrayListWithCapacity(futures.size()); 1296 this.allMustSucceed = allMustSucceed; 1297 this.remaining = new AtomicInteger(futures.size()); 1298 1299 init(listenerExecutor); 1300 } 1301 1302 private void init(final Executor listenerExecutor) { 1303 // First, schedule cleanup to execute when the Future is done. 1304 addListener(new Runnable() { 1305 @Override 1306 public void run() { 1307 // By now the values array has either been set as the Future's value, 1308 // or (in case of failure) is no longer useful. 1309 ListFuture.this.values = null; 1310 1311 // Let go of the memory held by other futures 1312 ListFuture.this.futures = null; 1313 } 1314 }, MoreExecutors.sameThreadExecutor()); 1315 1316 // Now begin the "real" initialization. 1317 1318 // Corner case: List is empty. 1319 if (futures.isEmpty()) { 1320 set(Lists.newArrayList(values)); 1321 return; 1322 } 1323 1324 // Populate the results list with null initially. 1325 for (int i = 0; i < futures.size(); ++i) { 1326 values.add(null); 1327 } 1328 1329 // Register a listener on each Future in the list to update 1330 // the state of this future. 1331 // Note that if all the futures on the list are done prior to completing 1332 // this loop, the last call to addListener() will callback to 1333 // setOneValue(), transitively call our cleanup listener, and set 1334 // this.futures to null. 1335 // We store a reference to futures to avoid the NPE. 1336 ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures; 1337 for (int i = 0; i < localFutures.size(); i++) { 1338 final ListenableFuture<? extends V> listenable = localFutures.get(i); 1339 final int index = i; 1340 listenable.addListener(new Runnable() { 1341 @Override 1342 public void run() { 1343 setOneValue(index, listenable); 1344 } 1345 }, listenerExecutor); 1346 } 1347 } 1348 1349 /** 1350 * Sets the value at the given index to that of the given future. 1351 */ 1352 private void setOneValue(int index, Future<? extends V> future) { 1353 List<V> localValues = values; 1354 if (isDone() || localValues == null) { 1355 // Some other future failed or has been cancelled, causing this one to 1356 // also be cancelled or have an exception set. This should only happen 1357 // if allMustSucceed is true. 1358 checkState(allMustSucceed, 1359 "Future was done before all dependencies completed"); 1360 return; 1361 } 1362 1363 try { 1364 checkState(future.isDone(), 1365 "Tried to set value from future which is not done"); 1366 localValues.set(index, getUninterruptibly(future)); 1367 } catch (CancellationException e) { 1368 if (allMustSucceed) { 1369 // Set ourselves as cancelled. Let the input futures keep running 1370 // as some of them may be used elsewhere. 1371 // (Currently we don't override interruptTask, so 1372 // mayInterruptIfRunning==false isn't technically necessary.) 1373 cancel(false); 1374 } 1375 } catch (ExecutionException e) { 1376 if (allMustSucceed) { 1377 // As soon as the first one fails, throw the exception up. 1378 // The result of all other inputs is then ignored. 1379 setException(e.getCause()); 1380 } 1381 } catch (RuntimeException e) { 1382 if (allMustSucceed) { 1383 setException(e); 1384 } 1385 } catch (Error e) { 1386 // Propagate errors up ASAP - our superclass will rethrow the error 1387 setException(e); 1388 } finally { 1389 int newRemaining = remaining.decrementAndGet(); 1390 checkState(newRemaining >= 0, "Less than 0 remaining futures"); 1391 if (newRemaining == 0) { 1392 localValues = values; 1393 if (localValues != null) { 1394 set(Lists.newArrayList(localValues)); 1395 } else { 1396 checkState(isDone()); 1397 } 1398 } 1399 } 1400 } 1401 1402 @Override 1403 public List<V> get() throws InterruptedException, ExecutionException { 1404 callAllGets(); 1405 1406 // This may still block in spite of the calls above, as the listeners may 1407 // be scheduled for execution in other threads. 1408 return super.get(); 1409 } 1410 1411 /** 1412 * Calls the get method of all dependency futures to work around a bug in 1413 * some ListenableFutures where the listeners aren't called until get() is 1414 * called. 1415 */ 1416 private void callAllGets() throws InterruptedException { 1417 List<? extends ListenableFuture<? extends V>> oldFutures = futures; 1418 if (oldFutures != null && !isDone()) { 1419 for (ListenableFuture<? extends V> future : oldFutures) { 1420 // We wait for a little while for the future, but if it's not done, 1421 // we check that no other futures caused a cancellation or failure. 1422 // This can introduce a delay of up to 10ms in reporting an exception. 1423 while (!future.isDone()) { 1424 try { 1425 future.get(); 1426 } catch (Error e) { 1427 throw e; 1428 } catch (InterruptedException e) { 1429 throw e; 1430 } catch (Throwable e) { 1431 // ExecutionException / CancellationException / RuntimeException 1432 if (allMustSucceed) { 1433 return; 1434 } else { 1435 continue; 1436 } 1437 } 1438 } 1439 } 1440 } 1441 } 1442 } 1443 1444 /** 1445 * A checked future that uses a function to map from exceptions to the 1446 * appropriate checked type. 1447 */ 1448 private static class MappingCheckedFuture<V, X extends Exception> extends 1449 AbstractCheckedFuture<V, X> { 1450 1451 final Function<Exception, X> mapper; 1452 1453 MappingCheckedFuture(ListenableFuture<V> delegate, 1454 Function<Exception, X> mapper) { 1455 super(delegate); 1456 1457 this.mapper = checkNotNull(mapper); 1458 } 1459 1460 @Override 1461 protected X mapException(Exception e) { 1462 return mapper.apply(e); 1463 } 1464 } 1465 } 1466