1 /* 2 * Copyright (C) 2008 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.base.Functions.constant; 20 import static com.google.common.base.Functions.identity; 21 import static com.google.common.base.Throwables.propagateIfInstanceOf; 22 import static com.google.common.collect.Iterables.getOnlyElement; 23 import static com.google.common.collect.Lists.newArrayList; 24 import static com.google.common.collect.Sets.intersection; 25 import static com.google.common.truth.Truth.assertThat; 26 import static com.google.common.truth.Truth.assertWithMessage; 27 import static com.google.common.util.concurrent.Futures.allAsList; 28 import static com.google.common.util.concurrent.Futures.catching; 29 import static com.google.common.util.concurrent.Futures.catchingAsync; 30 import static com.google.common.util.concurrent.Futures.getDone; 31 import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; 32 import static com.google.common.util.concurrent.Futures.immediateFailedFuture; 33 import static com.google.common.util.concurrent.Futures.immediateFuture; 34 import static com.google.common.util.concurrent.Futures.immediateVoidFuture; 35 import static com.google.common.util.concurrent.Futures.inCompletionOrder; 36 import static com.google.common.util.concurrent.Futures.lazyTransform; 37 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; 38 import static com.google.common.util.concurrent.Futures.scheduleAsync; 39 import static com.google.common.util.concurrent.Futures.submit; 40 import static com.google.common.util.concurrent.Futures.submitAsync; 41 import static com.google.common.util.concurrent.Futures.successfulAsList; 42 import static com.google.common.util.concurrent.Futures.transform; 43 import static com.google.common.util.concurrent.Futures.transformAsync; 44 import static com.google.common.util.concurrent.Futures.whenAllComplete; 45 import static com.google.common.util.concurrent.Futures.whenAllSucceed; 46 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 47 import static com.google.common.util.concurrent.TestPlatform.clearInterrupt; 48 import static com.google.common.util.concurrent.TestPlatform.getDoneFromTimeoutOverload; 49 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; 50 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 51 import static com.google.common.util.concurrent.testing.TestingExecutors.noOpScheduledExecutor; 52 import static java.util.Arrays.asList; 53 import static java.util.concurrent.Executors.newSingleThreadExecutor; 54 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; 55 import static java.util.concurrent.TimeUnit.MILLISECONDS; 56 import static java.util.concurrent.TimeUnit.SECONDS; 57 58 import com.google.common.annotations.GwtCompatible; 59 import com.google.common.annotations.GwtIncompatible; 60 import com.google.common.base.Function; 61 import com.google.common.base.Joiner; 62 import com.google.common.base.Predicate; 63 import com.google.common.collect.ImmutableList; 64 import com.google.common.collect.ImmutableSet; 65 import com.google.common.collect.Iterables; 66 import com.google.common.testing.ClassSanityTester; 67 import com.google.common.testing.GcFinalization; 68 import com.google.common.testing.TestLogHandler; 69 import com.google.errorprone.annotations.CanIgnoreReturnValue; 70 import java.io.FileNotFoundException; 71 import java.io.IOException; 72 import java.lang.ref.WeakReference; 73 import java.util.List; 74 import java.util.Set; 75 import java.util.concurrent.Callable; 76 import java.util.concurrent.CancellationException; 77 import java.util.concurrent.CountDownLatch; 78 import java.util.concurrent.ExecutionException; 79 import java.util.concurrent.Executor; 80 import java.util.concurrent.ExecutorService; 81 import java.util.concurrent.Executors; 82 import java.util.concurrent.Future; 83 import java.util.concurrent.RejectedExecutionException; 84 import java.util.concurrent.ScheduledExecutorService; 85 import java.util.concurrent.TimeUnit; 86 import java.util.concurrent.TimeoutException; 87 import java.util.concurrent.atomic.AtomicBoolean; 88 import java.util.logging.LogRecord; 89 import java.util.logging.Logger; 90 import junit.framework.AssertionFailedError; 91 import junit.framework.TestCase; 92 import org.checkerframework.checker.nullness.qual.Nullable; 93 94 /** 95 * Unit tests for {@link Futures}. 96 * 97 * @author Nishant Thakkar 98 */ 99 @GwtCompatible(emulated = true) 100 public class FuturesTest extends TestCase { 101 private static final Logger aggregateFutureLogger = 102 Logger.getLogger(AggregateFuture.class.getName()); 103 private final TestLogHandler aggregateFutureLogHandler = new TestLogHandler(); 104 105 private static final String DATA1 = "data"; 106 private static final String DATA2 = "more data"; 107 private static final String DATA3 = "most data"; 108 109 @Override setUp()110 public void setUp() throws Exception { 111 super.setUp(); 112 aggregateFutureLogger.addHandler(aggregateFutureLogHandler); 113 } 114 115 @Override tearDown()116 public void tearDown() throws Exception { 117 /* 118 * Clear interrupt for future tests. 119 * 120 * (Ideally we would perform interrupts only in threads that we create, but 121 * it's hard to imagine that anything will break in practice.) 122 */ 123 clearInterrupt(); 124 aggregateFutureLogger.removeHandler(aggregateFutureLogHandler); 125 super.tearDown(); 126 } 127 128 /* 129 * TODO(cpovirk): Use FutureSubject once it's part of core Truth. But be wary of using it when I'm 130 * really testing a Future implementation (e.g., in the case of immediate*Future()). But it's OK 131 * to use in the case of the majority of Futures that are AbstractFutures. 132 */ 133 testImmediateFuture()134 public void testImmediateFuture() throws Exception { 135 ListenableFuture<String> future = immediateFuture(DATA1); 136 137 assertSame(DATA1, getDone(future)); 138 assertSame(DATA1, getDoneFromTimeoutOverload(future)); 139 assertThat(future.toString()).contains("[status=SUCCESS, result=[" + DATA1 + "]]"); 140 } 141 testImmediateVoidFuture()142 public void testImmediateVoidFuture() throws Exception { 143 ListenableFuture<Void> voidFuture = immediateVoidFuture(); 144 145 assertThat(getDone(voidFuture)).isNull(); 146 assertThat(getDoneFromTimeoutOverload(voidFuture)).isNull(); 147 assertThat(voidFuture.toString()).contains("[status=SUCCESS, result=[null]]"); 148 } 149 testImmediateFailedFuture()150 public void testImmediateFailedFuture() throws Exception { 151 Exception exception = new Exception(); 152 ListenableFuture<String> future = immediateFailedFuture(exception); 153 assertThat(future.toString()).endsWith("[status=FAILURE, cause=[" + exception + "]]"); 154 155 try { 156 getDone(future); 157 fail(); 158 } catch (ExecutionException expected) { 159 assertSame(exception, expected.getCause()); 160 } 161 162 try { 163 getDoneFromTimeoutOverload(future); 164 fail(); 165 } catch (ExecutionException expected) { 166 assertSame(exception, expected.getCause()); 167 } 168 } 169 testImmediateFailedFuture_cancellationException()170 public void testImmediateFailedFuture_cancellationException() throws Exception { 171 CancellationException exception = new CancellationException(); 172 ListenableFuture<String> future = immediateFailedFuture(exception); 173 assertFalse(future.isCancelled()); 174 assertThat(future.toString()).endsWith("[status=FAILURE, cause=[" + exception + "]]"); 175 176 try { 177 getDone(future); 178 fail(); 179 } catch (ExecutionException expected) { 180 assertSame(exception, expected.getCause()); 181 } 182 183 try { 184 getDoneFromTimeoutOverload(future); 185 fail(); 186 } catch (ExecutionException expected) { 187 assertSame(exception, expected.getCause()); 188 } 189 } 190 testImmediateCancelledFutureBasic()191 public void testImmediateCancelledFutureBasic() throws Exception { 192 ListenableFuture<String> future = CallerClass1.makeImmediateCancelledFuture(); 193 assertTrue(future.isCancelled()); 194 } 195 196 @GwtIncompatible testImmediateCancelledFutureStack()197 public void testImmediateCancelledFutureStack() throws Exception { 198 ListenableFuture<String> future = CallerClass1.makeImmediateCancelledFuture(); 199 assertTrue(future.isCancelled()); 200 201 try { 202 CallerClass2.get(future); 203 fail(); 204 } catch (CancellationException expected) { 205 // There should be two CancellationException chained together. The outer one should have the 206 // stack trace of where the get() call was made, and the inner should have the stack trace of 207 // where the immediateCancelledFuture() call was made. 208 List<StackTraceElement> stackTrace = ImmutableList.copyOf(expected.getStackTrace()); 209 assertFalse(Iterables.any(stackTrace, hasClassName(CallerClass1.class))); 210 assertTrue(Iterables.any(stackTrace, hasClassName(CallerClass2.class))); 211 212 // See AbstractFutureCancellationCauseTest for how to set causes. 213 assertThat(expected.getCause()).isNull(); 214 } 215 } 216 217 @GwtIncompatible // used only in GwtIncompatible tests hasClassName(final Class<?> clazz)218 private static Predicate<StackTraceElement> hasClassName(final Class<?> clazz) { 219 return new Predicate<StackTraceElement>() { 220 @Override 221 public boolean apply(StackTraceElement element) { 222 return element.getClassName().equals(clazz.getName()); 223 } 224 }; 225 } 226 227 private static final class CallerClass1 { 228 static ListenableFuture<String> makeImmediateCancelledFuture() { 229 return immediateCancelledFuture(); 230 } 231 } 232 233 private static final class CallerClass2 { 234 @CanIgnoreReturnValue 235 static <V> V get(ListenableFuture<V> future) throws ExecutionException, InterruptedException { 236 return getDone(future); 237 } 238 } 239 240 private static class MyException extends Exception {} 241 242 // Class hierarchy for generics sanity checks 243 private static class Foo {} 244 245 private static class FooChild extends Foo {} 246 247 private static class Bar {} 248 249 private static class BarChild extends Bar {} 250 251 public void testTransform_genericsNull() throws Exception { 252 ListenableFuture<?> nullFuture = immediateFuture(null); 253 ListenableFuture<?> transformedFuture = transform(nullFuture, constant(null), directExecutor()); 254 assertNull(getDone(transformedFuture)); 255 } 256 257 public void testTransform_genericsHierarchy() throws Exception { 258 ListenableFuture<FooChild> future = immediateFuture(null); 259 final BarChild barChild = new BarChild(); 260 Function<Foo, BarChild> function = 261 new Function<Foo, BarChild>() { 262 @Override 263 public BarChild apply(Foo unused) { 264 return barChild; 265 } 266 }; 267 Bar bar = getDone(transform(future, function, directExecutor())); 268 assertSame(barChild, bar); 269 } 270 271 /* 272 * Android does not handle this stack overflow gracefully... though somehow some other 273 * stack-overflow tests work. It must depend on the exact place the error occurs. 274 */ 275 @AndroidIncompatible 276 @GwtIncompatible // StackOverflowError 277 public void testTransform_StackOverflow() throws Exception { 278 { 279 /* 280 * Initialize all relevant classes before running the test, which may otherwise poison any 281 * classes it is trying to load during its stack overflow. 282 */ 283 SettableFuture<Object> root = SettableFuture.create(); 284 ListenableFuture<Object> unused = transform(root, identity(), directExecutor()); 285 root.set("foo"); 286 } 287 288 SettableFuture<Object> root = SettableFuture.create(); 289 ListenableFuture<Object> output = root; 290 for (int i = 0; i < 10000; i++) { 291 output = transform(output, identity(), directExecutor()); 292 } 293 try { 294 root.set("foo"); 295 fail(); 296 } catch (StackOverflowError expected) { 297 } 298 } 299 300 public void testTransform_ErrorAfterCancellation() throws Exception { 301 class Transformer implements Function<Object, Object> { 302 ListenableFuture<Object> output; 303 304 @Override 305 public Object apply(Object input) { 306 output.cancel(false); 307 throw new MyError(); 308 } 309 } 310 Transformer transformer = new Transformer(); 311 SettableFuture<Object> input = SettableFuture.create(); 312 313 ListenableFuture<Object> output = transform(input, transformer, directExecutor()); 314 transformer.output = output; 315 316 input.set("foo"); 317 assertTrue(output.isCancelled()); 318 } 319 320 public void testTransform_ExceptionAfterCancellation() throws Exception { 321 class Transformer implements Function<Object, Object> { 322 ListenableFuture<Object> output; 323 324 @Override 325 public Object apply(Object input) { 326 output.cancel(false); 327 throw new MyRuntimeException(); 328 } 329 } 330 Transformer transformer = new Transformer(); 331 SettableFuture<Object> input = SettableFuture.create(); 332 333 ListenableFuture<Object> output = transform(input, transformer, directExecutor()); 334 transformer.output = output; 335 336 input.set("foo"); 337 assertTrue(output.isCancelled()); 338 } 339 340 public void testTransform_getThrowsRuntimeException() throws Exception { 341 ListenableFuture<Object> input = 342 UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); 343 344 ListenableFuture<Object> output = transform(input, identity(), directExecutor()); 345 try { 346 getDone(output); 347 fail(); 348 } catch (ExecutionException expected) { 349 assertThat(expected.getCause()).isInstanceOf(MyRuntimeException.class); 350 } 351 } 352 353 public void testTransform_getThrowsError() throws Exception { 354 ListenableFuture<Object> input = UncheckedThrowingFuture.throwingError(new MyError()); 355 356 ListenableFuture<Object> output = transform(input, identity(), directExecutor()); 357 try { 358 getDone(output); 359 fail(); 360 } catch (ExecutionException expected) { 361 assertThat(expected.getCause()).isInstanceOf(MyError.class); 362 } 363 } 364 365 public void testTransform_listenerThrowsError() throws Exception { 366 SettableFuture<Object> input = SettableFuture.create(); 367 ListenableFuture<Object> output = transform(input, identity(), directExecutor()); 368 369 output.addListener( 370 new Runnable() { 371 @Override 372 public void run() { 373 throw new MyError(); 374 } 375 }, 376 directExecutor()); 377 try { 378 input.set("foo"); 379 fail(); 380 } catch (MyError expected) { 381 } 382 } 383 384 public void testTransformAsync_cancelPropagatesToInput() throws Exception { 385 SettableFuture<Foo> input = SettableFuture.create(); 386 AsyncFunction<Foo, Bar> function = 387 new AsyncFunction<Foo, Bar>() { 388 @Override 389 public ListenableFuture<Bar> apply(Foo unused) { 390 throw new AssertionFailedError("Unexpeted call to apply."); 391 } 392 }; 393 assertTrue(transformAsync(input, function, directExecutor()).cancel(false)); 394 assertTrue(input.isCancelled()); 395 assertFalse(input.wasInterrupted()); 396 } 397 398 public void testTransformAsync_interruptPropagatesToInput() throws Exception { 399 SettableFuture<Foo> input = SettableFuture.create(); 400 AsyncFunction<Foo, Bar> function = 401 new AsyncFunction<Foo, Bar>() { 402 @Override 403 public ListenableFuture<Bar> apply(Foo unused) { 404 throw new AssertionFailedError("Unexpeted call to apply."); 405 } 406 }; 407 assertTrue(transformAsync(input, function, directExecutor()).cancel(true)); 408 assertTrue(input.isCancelled()); 409 assertTrue(input.wasInterrupted()); 410 } 411 412 @GwtIncompatible // threads 413 414 public void testTransformAsync_interruptPropagatesToTransformingThread() throws Exception { 415 SettableFuture<String> input = SettableFuture.create(); 416 final CountDownLatch inFunction = new CountDownLatch(1); 417 final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); 418 final CountDownLatch gotException = new CountDownLatch(1); 419 AsyncFunction<String, String> function = 420 new AsyncFunction<String, String>() { 421 @Override 422 public ListenableFuture<String> apply(String s) throws Exception { 423 inFunction.countDown(); 424 try { 425 shouldCompleteFunction.await(); 426 } catch (InterruptedException expected) { 427 gotException.countDown(); 428 throw expected; 429 } 430 return immediateFuture("a"); 431 } 432 }; 433 434 ListenableFuture<String> futureResult = 435 transformAsync(input, function, newSingleThreadExecutor()); 436 437 input.set("value"); 438 inFunction.await(); 439 futureResult.cancel(true); 440 shouldCompleteFunction.countDown(); 441 try { 442 futureResult.get(); 443 fail(); 444 } catch (CancellationException expected) { 445 } 446 // TODO(cpovirk): implement interruption, updating this test: 447 // https://github.com/google/guava/issues/1989 448 assertEquals(1, gotException.getCount()); 449 // gotException.await(); 450 } 451 452 public void testTransformAsync_cancelPropagatesToAsyncOutput() throws Exception { 453 ListenableFuture<Foo> immediate = immediateFuture(new Foo()); 454 final SettableFuture<Bar> secondary = SettableFuture.create(); 455 AsyncFunction<Foo, Bar> function = 456 new AsyncFunction<Foo, Bar>() { 457 @Override 458 public ListenableFuture<Bar> apply(Foo unused) { 459 return secondary; 460 } 461 }; 462 assertTrue(transformAsync(immediate, function, directExecutor()).cancel(false)); 463 assertTrue(secondary.isCancelled()); 464 assertFalse(secondary.wasInterrupted()); 465 } 466 467 public void testTransformAsync_interruptPropagatesToAsyncOutput() throws Exception { 468 ListenableFuture<Foo> immediate = immediateFuture(new Foo()); 469 final SettableFuture<Bar> secondary = SettableFuture.create(); 470 AsyncFunction<Foo, Bar> function = 471 new AsyncFunction<Foo, Bar>() { 472 @Override 473 public ListenableFuture<Bar> apply(Foo unused) { 474 return secondary; 475 } 476 }; 477 assertTrue(transformAsync(immediate, function, directExecutor()).cancel(true)); 478 assertTrue(secondary.isCancelled()); 479 assertTrue(secondary.wasInterrupted()); 480 } 481 482 public void testTransformAsync_inputCancelButNotInterruptPropagatesToOutput() throws Exception { 483 SettableFuture<Foo> f1 = SettableFuture.create(); 484 final SettableFuture<Bar> secondary = SettableFuture.create(); 485 AsyncFunction<Foo, Bar> function = 486 new AsyncFunction<Foo, Bar>() { 487 @Override 488 public ListenableFuture<Bar> apply(Foo unused) { 489 return secondary; 490 } 491 }; 492 ListenableFuture<Bar> f2 = transformAsync(f1, function, directExecutor()); 493 f1.cancel(true); 494 assertTrue(f2.isCancelled()); 495 /* 496 * We might like to propagate interruption, too, but it's not clear that it matters. For now, we 497 * test for the behavior that we have today. 498 */ 499 assertFalse(((AbstractFuture<?>) f2).wasInterrupted()); 500 } 501 502 /* 503 * Android does not handle this stack overflow gracefully... though somehow some other 504 * stack-overflow tests work. It must depend on the exact place the error occurs. 505 */ 506 @AndroidIncompatible 507 @GwtIncompatible // StackOverflowError 508 public void testTransformAsync_StackOverflow() throws Exception { 509 { 510 /* 511 * Initialize all relevant classes before running the test, which may otherwise poison any 512 * classes it is trying to load during its stack overflow. 513 */ 514 SettableFuture<Object> root = SettableFuture.create(); 515 ListenableFuture<Object> unused = transformAsync(root, asyncIdentity(), directExecutor()); 516 root.set("foo"); 517 } 518 519 SettableFuture<Object> root = SettableFuture.create(); 520 ListenableFuture<Object> output = root; 521 for (int i = 0; i < 10000; i++) { 522 output = transformAsync(output, asyncIdentity(), directExecutor()); 523 } 524 try { 525 root.set("foo"); 526 fail(); 527 } catch (StackOverflowError expected) { 528 } 529 } 530 531 public void testTransformAsync_ErrorAfterCancellation() throws Exception { 532 class Transformer implements AsyncFunction<Object, Object> { 533 ListenableFuture<Object> output; 534 535 @Override 536 public ListenableFuture<Object> apply(Object input) { 537 output.cancel(false); 538 throw new MyError(); 539 } 540 } 541 Transformer transformer = new Transformer(); 542 SettableFuture<Object> input = SettableFuture.create(); 543 544 ListenableFuture<Object> output = transformAsync(input, transformer, directExecutor()); 545 transformer.output = output; 546 547 input.set("foo"); 548 assertTrue(output.isCancelled()); 549 } 550 551 public void testTransformAsync_ExceptionAfterCancellation() throws Exception { 552 class Transformer implements AsyncFunction<Object, Object> { 553 ListenableFuture<Object> output; 554 555 @Override 556 public ListenableFuture<Object> apply(Object input) { 557 output.cancel(false); 558 throw new MyRuntimeException(); 559 } 560 } 561 Transformer transformer = new Transformer(); 562 SettableFuture<Object> input = SettableFuture.create(); 563 564 ListenableFuture<Object> output = transformAsync(input, transformer, directExecutor()); 565 transformer.output = output; 566 567 input.set("foo"); 568 assertTrue(output.isCancelled()); 569 } 570 571 public void testTransformAsync_getThrowsRuntimeException() throws Exception { 572 ListenableFuture<Object> input = 573 UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); 574 575 ListenableFuture<Object> output = transformAsync(input, asyncIdentity(), directExecutor()); 576 try { 577 getDone(output); 578 fail(); 579 } catch (ExecutionException expected) { 580 assertThat(expected.getCause()).isInstanceOf(MyRuntimeException.class); 581 } 582 } 583 584 public void testTransformAsync_getThrowsError() throws Exception { 585 ListenableFuture<Object> input = UncheckedThrowingFuture.throwingError(new MyError()); 586 587 ListenableFuture<Object> output = transformAsync(input, asyncIdentity(), directExecutor()); 588 try { 589 getDone(output); 590 fail(); 591 } catch (ExecutionException expected) { 592 assertThat(expected.getCause()).isInstanceOf(MyError.class); 593 } 594 } 595 596 public void testTransformAsync_listenerThrowsError() throws Exception { 597 SettableFuture<Object> input = SettableFuture.create(); 598 ListenableFuture<Object> output = transformAsync(input, asyncIdentity(), directExecutor()); 599 600 output.addListener( 601 new Runnable() { 602 @Override 603 public void run() { 604 throw new MyError(); 605 } 606 }, 607 directExecutor()); 608 try { 609 input.set("foo"); 610 fail(); 611 } catch (MyError expected) { 612 } 613 } 614 615 public void testTransform_rejectionPropagatesToOutput() throws Exception { 616 SettableFuture<Foo> input = SettableFuture.create(); 617 Function<Foo, Foo> identity = identity(); 618 ListenableFuture<Foo> transformed = transform(input, identity, REJECTING_EXECUTOR); 619 input.set(new Foo()); 620 try { 621 getDone(transformed); 622 fail(); 623 } catch (ExecutionException expected) { 624 assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); 625 } 626 } 627 628 public void testTransformAsync_rejectionPropagatesToOutput() throws Exception { 629 SettableFuture<Foo> input = SettableFuture.create(); 630 AsyncFunction<Foo, Foo> asyncIdentity = asyncIdentity(); 631 ListenableFuture<Foo> transformed = transformAsync(input, asyncIdentity, REJECTING_EXECUTOR); 632 input.set(new Foo()); 633 try { 634 getDone(transformed); 635 fail(); 636 } catch (ExecutionException expected) { 637 assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); 638 } 639 } 640 641 /** Tests that the function is invoked only once, even if it throws an exception. */ 642 public void testTransformValueRemainsMemoized() throws Exception { 643 class Holder { 644 645 int value = 2; 646 } 647 final Holder holder = new Holder(); 648 649 // This function adds the holder's value to the input value. 650 Function<Integer, Integer> adder = 651 new Function<Integer, Integer>() { 652 @Override 653 public Integer apply(Integer from) { 654 return from + holder.value; 655 } 656 }; 657 658 // Since holder.value is 2, applying 4 should yield 6. 659 assertEquals(6, adder.apply(4).intValue()); 660 661 ListenableFuture<Integer> immediateFuture = immediateFuture(4); 662 Future<Integer> transformedFuture = transform(immediateFuture, adder, directExecutor()); 663 664 // The composed future also yields 6. 665 assertEquals(6, getDone(transformedFuture).intValue()); 666 667 // Repeated calls yield the same value even though the function's behavior 668 // changes 669 holder.value = 3; 670 assertEquals(6, getDone(transformedFuture).intValue()); 671 assertEquals(7, adder.apply(4).intValue()); 672 673 // Once more, with feeling. 674 holder.value = 4; 675 assertEquals(6, getDone(transformedFuture).intValue()); 676 assertEquals(8, adder.apply(4).intValue()); 677 678 // Memoized get also retains the value. 679 assertEquals(6, getDoneFromTimeoutOverload(transformedFuture).intValue()); 680 681 // Unsurprisingly, recomposing the future will return an updated value. 682 assertEquals(8, getDone(transform(immediateFuture, adder, directExecutor())).intValue()); 683 684 // Repeating, with the timeout version 685 assertEquals( 686 8, 687 getDoneFromTimeoutOverload(transform(immediateFuture, adder, directExecutor())).intValue()); 688 } 689 690 static class MyError extends Error {} 691 692 static class MyRuntimeException extends RuntimeException {} 693 694 /** 695 * Test that the function is invoked only once, even if it throws an exception. Also, test that 696 * that function's result is wrapped in an ExecutionException. 697 */ 698 @GwtIncompatible // reflection 699 public void testTransformExceptionRemainsMemoized() throws Throwable { 700 // We need to test with two input futures since ExecutionList.execute 701 // doesn't catch Errors and we cannot depend on the order that our 702 // transformations run. (So it is possible that the Error being thrown 703 // could prevent our second transformations from running). 704 SettableFuture<Integer> exceptionInput = SettableFuture.create(); 705 ListenableFuture<Integer> exceptionComposedFuture = 706 transform(exceptionInput, newOneTimeExceptionThrower(), directExecutor()); 707 exceptionInput.set(0); 708 runGetIdempotencyTest(exceptionComposedFuture, MyRuntimeException.class); 709 710 SettableFuture<Integer> errorInput = SettableFuture.create(); 711 ListenableFuture<Integer> errorComposedFuture = 712 transform(errorInput, newOneTimeErrorThrower(), directExecutor()); 713 errorInput.set(0); 714 715 runGetIdempotencyTest(errorComposedFuture, MyError.class); 716 717 /* 718 * Try again when the input's value is already filled in, since the flow is 719 * slightly different in that case. 720 */ 721 exceptionComposedFuture = 722 transform(exceptionInput, newOneTimeExceptionThrower(), directExecutor()); 723 runGetIdempotencyTest(exceptionComposedFuture, MyRuntimeException.class); 724 725 runGetIdempotencyTest( 726 transform(errorInput, newOneTimeErrorThrower(), directExecutor()), MyError.class); 727 runGetIdempotencyTest(errorComposedFuture, MyError.class); 728 } 729 730 @GwtIncompatible // reflection 731 private static void runGetIdempotencyTest( 732 Future<Integer> transformedFuture, Class<? extends Throwable> expectedExceptionClass) 733 throws Throwable { 734 for (int i = 0; i < 5; i++) { 735 try { 736 getDone(transformedFuture); 737 fail(); 738 } catch (ExecutionException expected) { 739 if (!expectedExceptionClass.isInstance(expected.getCause())) { 740 throw expected.getCause(); 741 } 742 } 743 } 744 } 745 746 @GwtIncompatible // used only in GwtIncompatible tests 747 private static Function<Integer, Integer> newOneTimeExceptionThrower() { 748 return new Function<Integer, Integer>() { 749 int calls = 0; 750 751 @Override 752 public Integer apply(Integer from) { 753 if (++calls > 1) { 754 fail(); 755 } 756 throw new MyRuntimeException(); 757 } 758 }; 759 } 760 761 @GwtIncompatible // used only in GwtIncompatible tests 762 private static Function<Integer, Integer> newOneTimeErrorThrower() { 763 return new Function<Integer, Integer>() { 764 int calls = 0; 765 766 @Override 767 public Integer apply(Integer from) { 768 if (++calls > 1) { 769 fail(); 770 } 771 throw new MyError(); 772 } 773 }; 774 } 775 776 // TODO(cpovirk): top-level class? 777 static class ExecutorSpy implements Executor { 778 779 Executor delegate; 780 boolean wasExecuted; 781 782 public ExecutorSpy(Executor delegate) { 783 this.delegate = delegate; 784 } 785 786 @Override 787 public void execute(Runnable command) { 788 delegate.execute(command); 789 wasExecuted = true; 790 } 791 } 792 793 public void testTransform_Executor() throws Exception { 794 Object value = new Object(); 795 ExecutorSpy spy = new ExecutorSpy(directExecutor()); 796 797 assertFalse(spy.wasExecuted); 798 799 ListenableFuture<Object> future = transform(immediateFuture(value), identity(), spy); 800 801 assertSame(value, getDone(future)); 802 assertTrue(spy.wasExecuted); 803 } 804 805 @GwtIncompatible // Threads 806 807 public void testTransformAsync_functionToString() throws Exception { 808 final CountDownLatch functionCalled = new CountDownLatch(1); 809 final CountDownLatch functionBlocking = new CountDownLatch(1); 810 AsyncFunction<Object, Object> function = 811 tagged( 812 "Called my toString", 813 new AsyncFunction<Object, Object>() { 814 @Override 815 public ListenableFuture<Object> apply(Object input) throws Exception { 816 functionCalled.countDown(); 817 functionBlocking.await(); 818 return immediateFuture(null); 819 } 820 }); 821 822 ExecutorService executor = Executors.newSingleThreadExecutor(); 823 try { 824 ListenableFuture<?> output = 825 Futures.transformAsync(immediateFuture(null), function, executor); 826 functionCalled.await(); 827 assertThat(output.toString()).contains(function.toString()); 828 } finally { 829 functionBlocking.countDown(); 830 executor.shutdown(); 831 } 832 } 833 834 @GwtIncompatible // lazyTransform 835 public void testLazyTransform() throws Exception { 836 FunctionSpy<Object, String> spy = new FunctionSpy<>(constant("bar")); 837 Future<String> input = immediateFuture("foo"); 838 Future<String> transformed = lazyTransform(input, spy); 839 spy.verifyCallCount(0); 840 assertEquals("bar", getDone(transformed)); 841 spy.verifyCallCount(1); 842 assertEquals("bar", getDone(transformed)); 843 spy.verifyCallCount(2); 844 } 845 846 @GwtIncompatible // lazyTransform 847 public void testLazyTransform_exception() throws Exception { 848 final RuntimeException exception = new RuntimeException("deliberate"); 849 Function<Integer, String> function = 850 new Function<Integer, String>() { 851 @Override 852 public String apply(Integer input) { 853 throw exception; 854 } 855 }; 856 Future<String> transformed = lazyTransform(immediateFuture(1), function); 857 try { 858 getDone(transformed); 859 fail(); 860 } catch (ExecutionException expected) { 861 assertSame(exception, expected.getCause()); 862 } 863 try { 864 getDoneFromTimeoutOverload(transformed); 865 fail(); 866 } catch (ExecutionException expected) { 867 assertSame(exception, expected.getCause()); 868 } 869 } 870 871 private static class FunctionSpy<I, O> implements Function<I, O> { 872 private int applyCount; 873 private final Function<I, O> delegate; 874 875 public FunctionSpy(Function<I, O> delegate) { 876 this.delegate = delegate; 877 } 878 879 @Override 880 public O apply(I input) { 881 applyCount++; 882 return delegate.apply(input); 883 } 884 885 void verifyCallCount(int expected) { 886 assertThat(applyCount).isEqualTo(expected); 887 } 888 } 889 890 private static <X extends Throwable, V> Function<X, V> unexpectedFunction() { 891 return new Function<X, V>() { 892 @Override 893 public V apply(X t) { 894 throw newAssertionError("Unexpected fallback", t); 895 } 896 }; 897 } 898 899 private static class AsyncFunctionSpy<X extends Throwable, V> implements AsyncFunction<X, V> { 900 private int count; 901 private final AsyncFunction<X, V> delegate; 902 903 public AsyncFunctionSpy(AsyncFunction<X, V> delegate) { 904 this.delegate = delegate; 905 } 906 907 @Override 908 public final ListenableFuture<V> apply(X t) throws Exception { 909 count++; 910 return delegate.apply(t); 911 } 912 913 void verifyCallCount(int expected) { 914 assertThat(count).isEqualTo(expected); 915 } 916 } 917 918 private static <I, O> FunctionSpy<I, O> spy(Function<I, O> delegate) { 919 return new FunctionSpy<>(delegate); 920 } 921 922 private static <X extends Throwable, V> AsyncFunctionSpy<X, V> spy(AsyncFunction<X, V> delegate) { 923 return new AsyncFunctionSpy<>(delegate); 924 } 925 926 private static <X extends Throwable, V> AsyncFunction<X, V> unexpectedAsyncFunction() { 927 return new AsyncFunction<X, V>() { 928 @Override 929 public ListenableFuture<V> apply(X t) { 930 throw newAssertionError("Unexpected fallback", t); 931 } 932 }; 933 } 934 935 /** Alternative to AssertionError(String, Throwable), which doesn't exist in GWT 2.6.1. */ 936 private static AssertionError newAssertionError(String message, Throwable cause) { 937 AssertionError e = new AssertionError(message); 938 e.initCause(cause); 939 return e; 940 } 941 942 // catchingAsync tests cloned from the old withFallback tests: 943 944 public void testCatchingAsync_inputDoesNotRaiseException() throws Exception { 945 AsyncFunction<Throwable, Integer> fallback = unexpectedAsyncFunction(); 946 ListenableFuture<Integer> originalFuture = immediateFuture(7); 947 ListenableFuture<Integer> faultTolerantFuture = 948 catchingAsync(originalFuture, Throwable.class, fallback, directExecutor()); 949 assertEquals(7, getDone(faultTolerantFuture).intValue()); 950 } 951 952 public void testCatchingAsync_inputRaisesException() throws Exception { 953 final RuntimeException raisedException = new RuntimeException(); 954 AsyncFunctionSpy<Throwable, Integer> fallback = 955 spy( 956 new AsyncFunction<Throwable, Integer>() { 957 @Override 958 public ListenableFuture<Integer> apply(Throwable t) throws Exception { 959 assertThat(t).isSameInstanceAs(raisedException); 960 return immediateFuture(20); 961 } 962 }); 963 ListenableFuture<Integer> failingFuture = immediateFailedFuture(raisedException); 964 ListenableFuture<Integer> faultTolerantFuture = 965 catchingAsync(failingFuture, Throwable.class, fallback, directExecutor()); 966 assertEquals(20, getDone(faultTolerantFuture).intValue()); 967 fallback.verifyCallCount(1); 968 } 969 970 @GwtIncompatible // non-Throwable exceptionType 971 public void testCatchingAsync_inputCancelledWithoutFallback() throws Exception { 972 AsyncFunction<Throwable, Integer> fallback = unexpectedAsyncFunction(); 973 ListenableFuture<Integer> originalFuture = immediateCancelledFuture(); 974 ListenableFuture<Integer> faultTolerantFuture = 975 catchingAsync(originalFuture, IOException.class, fallback, directExecutor()); 976 assertTrue(faultTolerantFuture.isCancelled()); 977 } 978 979 public void testCatchingAsync_fallbackGeneratesRuntimeException() throws Exception { 980 RuntimeException expectedException = new RuntimeException(); 981 runExpectedExceptionCatchingAsyncTest(expectedException, false); 982 } 983 984 public void testCatchingAsync_fallbackGeneratesCheckedException() throws Exception { 985 Exception expectedException = new Exception() {}; 986 runExpectedExceptionCatchingAsyncTest(expectedException, false); 987 } 988 989 public void testCatchingAsync_fallbackGeneratesError() throws Exception { 990 final Error error = new Error("deliberate"); 991 AsyncFunction<Throwable, Integer> fallback = 992 new AsyncFunction<Throwable, Integer>() { 993 @Override 994 public ListenableFuture<Integer> apply(Throwable t) throws Exception { 995 throw error; 996 } 997 }; 998 ListenableFuture<Integer> failingFuture = immediateFailedFuture(new RuntimeException()); 999 try { 1000 getDone(catchingAsync(failingFuture, Throwable.class, fallback, directExecutor())); 1001 fail(); 1002 } catch (ExecutionException expected) { 1003 assertSame(error, expected.getCause()); 1004 } 1005 } 1006 1007 public void testCatchingAsync_fallbackReturnsRuntimeException() throws Exception { 1008 RuntimeException expectedException = new RuntimeException(); 1009 runExpectedExceptionCatchingAsyncTest(expectedException, true); 1010 } 1011 1012 public void testCatchingAsync_fallbackReturnsCheckedException() throws Exception { 1013 Exception expectedException = new Exception() {}; 1014 runExpectedExceptionCatchingAsyncTest(expectedException, true); 1015 } 1016 1017 private void runExpectedExceptionCatchingAsyncTest( 1018 final Exception expectedException, final boolean wrapInFuture) throws Exception { 1019 AsyncFunctionSpy<Throwable, Integer> fallback = 1020 spy( 1021 new AsyncFunction<Throwable, Integer>() { 1022 @Override 1023 public ListenableFuture<Integer> apply(Throwable t) throws Exception { 1024 if (!wrapInFuture) { 1025 throw expectedException; 1026 } else { 1027 return immediateFailedFuture(expectedException); 1028 } 1029 } 1030 }); 1031 1032 ListenableFuture<Integer> failingFuture = immediateFailedFuture(new RuntimeException()); 1033 1034 ListenableFuture<Integer> faultTolerantFuture = 1035 catchingAsync(failingFuture, Throwable.class, fallback, directExecutor()); 1036 try { 1037 getDone(faultTolerantFuture); 1038 fail(); 1039 } catch (ExecutionException expected) { 1040 assertSame(expectedException, expected.getCause()); 1041 } 1042 fallback.verifyCallCount(1); 1043 } 1044 1045 public void testCatchingAsync_fallbackNotReady() throws Exception { 1046 ListenableFuture<Integer> primary = immediateFailedFuture(new Exception()); 1047 final SettableFuture<Integer> secondary = SettableFuture.create(); 1048 AsyncFunction<Throwable, Integer> fallback = 1049 new AsyncFunction<Throwable, Integer>() { 1050 @Override 1051 public ListenableFuture<Integer> apply(Throwable t) { 1052 return secondary; 1053 } 1054 }; 1055 ListenableFuture<Integer> derived = 1056 catchingAsync(primary, Throwable.class, fallback, directExecutor()); 1057 secondary.set(1); 1058 assertEquals(1, (int) getDone(derived)); 1059 } 1060 1061 public void testCatchingAsync_resultInterruptedBeforeFallback() throws Exception { 1062 SettableFuture<Integer> primary = SettableFuture.create(); 1063 AsyncFunction<Throwable, Integer> fallback = unexpectedAsyncFunction(); 1064 ListenableFuture<Integer> derived = 1065 catchingAsync(primary, Throwable.class, fallback, directExecutor()); 1066 derived.cancel(true); 1067 assertTrue(primary.isCancelled()); 1068 assertTrue(primary.wasInterrupted()); 1069 } 1070 1071 public void testCatchingAsync_resultCancelledBeforeFallback() throws Exception { 1072 SettableFuture<Integer> primary = SettableFuture.create(); 1073 AsyncFunction<Throwable, Integer> fallback = unexpectedAsyncFunction(); 1074 ListenableFuture<Integer> derived = 1075 catchingAsync(primary, Throwable.class, fallback, directExecutor()); 1076 derived.cancel(false); 1077 assertTrue(primary.isCancelled()); 1078 assertFalse(primary.wasInterrupted()); 1079 } 1080 1081 @GwtIncompatible // mocks 1082 // TODO(cpovirk): eliminate use of mocks 1083 @SuppressWarnings("unchecked") 1084 public void testCatchingAsync_resultCancelledAfterFallback() throws Exception { 1085 final SettableFuture<Integer> secondary = SettableFuture.create(); 1086 final RuntimeException raisedException = new RuntimeException(); 1087 AsyncFunctionSpy<Throwable, Integer> fallback = 1088 spy( 1089 new AsyncFunction<Throwable, Integer>() { 1090 @Override 1091 public ListenableFuture<Integer> apply(Throwable t) throws Exception { 1092 assertThat(t).isSameInstanceAs(raisedException); 1093 return secondary; 1094 } 1095 }); 1096 1097 ListenableFuture<Integer> failingFuture = immediateFailedFuture(raisedException); 1098 1099 ListenableFuture<Integer> derived = 1100 catchingAsync(failingFuture, Throwable.class, fallback, directExecutor()); 1101 derived.cancel(false); 1102 assertTrue(secondary.isCancelled()); 1103 assertFalse(secondary.wasInterrupted()); 1104 fallback.verifyCallCount(1); 1105 } 1106 1107 public void testCatchingAsync_nullInsteadOfFuture() throws Exception { 1108 ListenableFuture<Integer> inputFuture = immediateFailedFuture(new Exception()); 1109 ListenableFuture<?> chainedFuture = 1110 catchingAsync( 1111 inputFuture, 1112 Throwable.class, 1113 new AsyncFunction<Throwable, Integer>() { 1114 @Override 1115 @SuppressWarnings("AsyncFunctionReturnsNull") 1116 public ListenableFuture<Integer> apply(Throwable t) { 1117 return null; 1118 } 1119 }, 1120 directExecutor()); 1121 try { 1122 getDone(chainedFuture); 1123 fail(); 1124 } catch (ExecutionException expected) { 1125 NullPointerException cause = (NullPointerException) expected.getCause(); 1126 assertThat(cause) 1127 .hasMessageThat() 1128 .contains( 1129 "AsyncFunction.apply returned null instead of a Future. " 1130 + "Did you mean to return immediateFuture(null)?"); 1131 } 1132 } 1133 1134 @GwtIncompatible // threads 1135 1136 public void testCatchingAsync_interruptPropagatesToTransformingThread() throws Exception { 1137 SettableFuture<String> input = SettableFuture.create(); 1138 final CountDownLatch inFunction = new CountDownLatch(1); 1139 final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); 1140 final CountDownLatch gotException = new CountDownLatch(1); 1141 AsyncFunction<Throwable, String> function = 1142 new AsyncFunction<Throwable, String>() { 1143 @Override 1144 public ListenableFuture<String> apply(Throwable t) throws Exception { 1145 inFunction.countDown(); 1146 try { 1147 shouldCompleteFunction.await(); 1148 } catch (InterruptedException expected) { 1149 gotException.countDown(); 1150 throw expected; 1151 } 1152 return immediateFuture("a"); 1153 } 1154 }; 1155 1156 ListenableFuture<String> futureResult = 1157 catchingAsync(input, Exception.class, function, newSingleThreadExecutor()); 1158 1159 input.setException(new Exception()); 1160 inFunction.await(); 1161 futureResult.cancel(true); 1162 shouldCompleteFunction.countDown(); 1163 try { 1164 futureResult.get(); 1165 fail(); 1166 } catch (CancellationException expected) { 1167 } 1168 // TODO(cpovirk): implement interruption, updating this test: 1169 // https://github.com/google/guava/issues/1989 1170 assertEquals(1, gotException.getCount()); 1171 // gotException.await(); 1172 } 1173 1174 @GwtIncompatible // Threads 1175 1176 public void testCatchingAsync_functionToString() throws Exception { 1177 final CountDownLatch functionCalled = new CountDownLatch(1); 1178 final CountDownLatch functionBlocking = new CountDownLatch(1); 1179 AsyncFunction<Object, Object> function = 1180 tagged( 1181 "Called my toString", 1182 new AsyncFunction<Object, Object>() { 1183 @Override 1184 public ListenableFuture<Object> apply(Object input) throws Exception { 1185 functionCalled.countDown(); 1186 functionBlocking.await(); 1187 return immediateFuture(null); 1188 } 1189 }); 1190 1191 ExecutorService executor = Executors.newSingleThreadExecutor(); 1192 try { 1193 ListenableFuture<?> output = 1194 Futures.catchingAsync( 1195 immediateFailedFuture(new RuntimeException()), Throwable.class, function, executor); 1196 functionCalled.await(); 1197 assertThat(output.toString()).contains(function.toString()); 1198 } finally { 1199 functionBlocking.countDown(); 1200 executor.shutdown(); 1201 } 1202 } 1203 1204 public void testCatchingAsync_futureToString() throws Exception { 1205 final SettableFuture<Object> toReturn = SettableFuture.create(); 1206 AsyncFunction<Object, Object> function = 1207 tagged( 1208 "Called my toString", 1209 new AsyncFunction<Object, Object>() { 1210 @Override 1211 public ListenableFuture<Object> apply(Object input) throws Exception { 1212 return toReturn; 1213 } 1214 }); 1215 1216 ListenableFuture<?> output = 1217 Futures.catchingAsync( 1218 immediateFailedFuture(new RuntimeException()), 1219 Throwable.class, 1220 function, 1221 directExecutor()); 1222 assertThat(output.toString()).contains(toReturn.toString()); 1223 } 1224 1225 // catching tests cloned from the old withFallback tests: 1226 1227 public void testCatching_inputDoesNotRaiseException() throws Exception { 1228 Function<Throwable, Integer> fallback = unexpectedFunction(); 1229 ListenableFuture<Integer> originalFuture = immediateFuture(7); 1230 ListenableFuture<Integer> faultTolerantFuture = 1231 catching(originalFuture, Throwable.class, fallback, directExecutor()); 1232 assertEquals(7, getDone(faultTolerantFuture).intValue()); 1233 } 1234 1235 public void testCatching_inputRaisesException() throws Exception { 1236 final RuntimeException raisedException = new RuntimeException(); 1237 FunctionSpy<Throwable, Integer> fallback = 1238 spy( 1239 new Function<Throwable, Integer>() { 1240 @Override 1241 public Integer apply(Throwable t) { 1242 assertThat(t).isSameInstanceAs(raisedException); 1243 return 20; 1244 } 1245 }); 1246 ListenableFuture<Integer> failingFuture = immediateFailedFuture(raisedException); 1247 ListenableFuture<Integer> faultTolerantFuture = 1248 catching(failingFuture, Throwable.class, fallback, directExecutor()); 1249 assertEquals(20, getDone(faultTolerantFuture).intValue()); 1250 fallback.verifyCallCount(1); 1251 } 1252 1253 @GwtIncompatible // non-Throwable exceptionType 1254 public void testCatching_inputCancelledWithoutFallback() throws Exception { 1255 Function<IOException, Integer> fallback = unexpectedFunction(); 1256 ListenableFuture<Integer> originalFuture = immediateCancelledFuture(); 1257 ListenableFuture<Integer> faultTolerantFuture = 1258 catching(originalFuture, IOException.class, fallback, directExecutor()); 1259 assertTrue(faultTolerantFuture.isCancelled()); 1260 } 1261 1262 public void testCatching_fallbackGeneratesRuntimeException() throws Exception { 1263 RuntimeException expectedException = new RuntimeException(); 1264 runExpectedExceptionCatchingTest(expectedException); 1265 } 1266 1267 /* 1268 * catching() uses a plain Function, so there's no 1269 * testCatching_fallbackGeneratesCheckedException(). 1270 */ 1271 1272 public void testCatching_fallbackGeneratesError() throws Exception { 1273 final Error error = new Error("deliberate"); 1274 Function<Throwable, Integer> fallback = 1275 new Function<Throwable, Integer>() { 1276 @Override 1277 public Integer apply(Throwable t) { 1278 throw error; 1279 } 1280 }; 1281 ListenableFuture<Integer> failingFuture = immediateFailedFuture(new RuntimeException()); 1282 try { 1283 getDone(catching(failingFuture, Throwable.class, fallback, directExecutor())); 1284 fail(); 1285 } catch (ExecutionException expected) { 1286 assertSame(error, expected.getCause()); 1287 } 1288 } 1289 1290 /* 1291 * catching() uses a plain Function, so there's no testCatching_fallbackReturnsRuntimeException() 1292 * or testCatching_fallbackReturnsCheckedException(). 1293 */ 1294 1295 private void runExpectedExceptionCatchingTest(final RuntimeException expectedException) 1296 throws Exception { 1297 FunctionSpy<Throwable, Integer> fallback = 1298 spy( 1299 new Function<Throwable, Integer>() { 1300 @Override 1301 public Integer apply(Throwable t) { 1302 throw expectedException; 1303 } 1304 }); 1305 1306 ListenableFuture<Integer> failingFuture = immediateFailedFuture(new RuntimeException()); 1307 1308 ListenableFuture<Integer> faultTolerantFuture = 1309 catching(failingFuture, Throwable.class, fallback, directExecutor()); 1310 try { 1311 getDone(faultTolerantFuture); 1312 fail(); 1313 } catch (ExecutionException expected) { 1314 assertSame(expectedException, expected.getCause()); 1315 } 1316 fallback.verifyCallCount(1); 1317 } 1318 1319 // catching() uses a plain Function, so there's no testCatching_fallbackNotReady(). 1320 1321 public void testCatching_resultInterruptedBeforeFallback() throws Exception { 1322 SettableFuture<Integer> primary = SettableFuture.create(); 1323 Function<Throwable, Integer> fallback = unexpectedFunction(); 1324 ListenableFuture<Integer> derived = 1325 catching(primary, Throwable.class, fallback, directExecutor()); 1326 derived.cancel(true); 1327 assertTrue(primary.isCancelled()); 1328 assertTrue(primary.wasInterrupted()); 1329 } 1330 1331 public void testCatching_resultCancelledBeforeFallback() throws Exception { 1332 SettableFuture<Integer> primary = SettableFuture.create(); 1333 Function<Throwable, Integer> fallback = unexpectedFunction(); 1334 ListenableFuture<Integer> derived = 1335 catching(primary, Throwable.class, fallback, directExecutor()); 1336 derived.cancel(false); 1337 assertTrue(primary.isCancelled()); 1338 assertFalse(primary.wasInterrupted()); 1339 } 1340 1341 // catching() uses a plain Function, so there's no testCatching_resultCancelledAfterFallback(). 1342 1343 // catching() uses a plain Function, so there's no testCatching_nullInsteadOfFuture(). 1344 1345 // Some tests of the exceptionType parameter: 1346 1347 public void testCatching_Throwable() throws Exception { 1348 Function<Throwable, Integer> fallback = functionReturningOne(); 1349 ListenableFuture<Integer> originalFuture = immediateFailedFuture(new IOException()); 1350 ListenableFuture<Integer> faultTolerantFuture = 1351 catching(originalFuture, Throwable.class, fallback, directExecutor()); 1352 assertEquals(1, (int) getDone(faultTolerantFuture)); 1353 } 1354 1355 @GwtIncompatible // non-Throwable exceptionType 1356 public void testCatching_customTypeMatch() throws Exception { 1357 Function<IOException, Integer> fallback = functionReturningOne(); 1358 ListenableFuture<Integer> originalFuture = immediateFailedFuture(new FileNotFoundException()); 1359 ListenableFuture<Integer> faultTolerantFuture = 1360 catching(originalFuture, IOException.class, fallback, directExecutor()); 1361 assertEquals(1, (int) getDone(faultTolerantFuture)); 1362 } 1363 1364 @GwtIncompatible // non-Throwable exceptionType 1365 public void testCatching_customTypeNoMatch() throws Exception { 1366 Function<IOException, Integer> fallback = functionReturningOne(); 1367 ListenableFuture<Integer> originalFuture = immediateFailedFuture(new RuntimeException()); 1368 ListenableFuture<Integer> faultTolerantFuture = 1369 catching(originalFuture, IOException.class, fallback, directExecutor()); 1370 try { 1371 getDone(faultTolerantFuture); 1372 fail(); 1373 } catch (ExecutionException expected) { 1374 assertThat(expected.getCause()).isInstanceOf(RuntimeException.class); 1375 } 1376 } 1377 1378 @GwtIncompatible // StackOverflowError 1379 public void testCatching_StackOverflow() throws Exception { 1380 { 1381 /* 1382 * Initialize all relevant classes before running the test, which may otherwise poison any 1383 * classes it is trying to load during its stack overflow. 1384 */ 1385 SettableFuture<Object> root = SettableFuture.create(); 1386 ListenableFuture<Object> unused = 1387 catching(root, MyException.class, identity(), directExecutor()); 1388 root.setException(new MyException()); 1389 } 1390 1391 SettableFuture<Object> root = SettableFuture.create(); 1392 ListenableFuture<Object> output = root; 1393 for (int i = 0; i < 10000; i++) { 1394 output = catching(output, MyException.class, identity(), directExecutor()); 1395 } 1396 try { 1397 root.setException(new MyException()); 1398 fail(); 1399 } catch (StackOverflowError expected) { 1400 } 1401 } 1402 1403 public void testCatching_ErrorAfterCancellation() throws Exception { 1404 class Fallback implements Function<Throwable, Object> { 1405 ListenableFuture<Object> output; 1406 1407 @Override 1408 public Object apply(Throwable input) { 1409 output.cancel(false); 1410 throw new MyError(); 1411 } 1412 } 1413 Fallback fallback = new Fallback(); 1414 SettableFuture<Object> input = SettableFuture.create(); 1415 1416 ListenableFuture<Object> output = catching(input, Throwable.class, fallback, directExecutor()); 1417 fallback.output = output; 1418 1419 input.setException(new MyException()); 1420 assertTrue(output.isCancelled()); 1421 } 1422 1423 public void testCatching_ExceptionAfterCancellation() throws Exception { 1424 class Fallback implements Function<Throwable, Object> { 1425 ListenableFuture<Object> output; 1426 1427 @Override 1428 public Object apply(Throwable input) { 1429 output.cancel(false); 1430 throw new MyRuntimeException(); 1431 } 1432 } 1433 Fallback fallback = new Fallback(); 1434 SettableFuture<Object> input = SettableFuture.create(); 1435 1436 ListenableFuture<Object> output = catching(input, Throwable.class, fallback, directExecutor()); 1437 fallback.output = output; 1438 1439 input.setException(new MyException()); 1440 assertTrue(output.isCancelled()); 1441 } 1442 1443 public void testCatching_getThrowsRuntimeException() throws Exception { 1444 ListenableFuture<Object> input = 1445 UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); 1446 1447 // We'd catch only MyRuntimeException.class here, but then the test won't compile under GWT. 1448 ListenableFuture<Object> output = 1449 catching(input, Throwable.class, identity(), directExecutor()); 1450 assertThat(getDone(output)).isInstanceOf(MyRuntimeException.class); 1451 } 1452 1453 public void testCatching_getThrowsError() throws Exception { 1454 ListenableFuture<Object> input = UncheckedThrowingFuture.throwingError(new MyError()); 1455 1456 // We'd catch only MyError.class here, but then the test won't compile under GWT. 1457 ListenableFuture<Object> output = 1458 catching(input, Throwable.class, identity(), directExecutor()); 1459 assertThat(getDone(output)).isInstanceOf(MyError.class); 1460 } 1461 1462 public void testCatching_listenerThrowsError() throws Exception { 1463 SettableFuture<Object> input = SettableFuture.create(); 1464 ListenableFuture<Object> output = 1465 catching(input, Throwable.class, identity(), directExecutor()); 1466 1467 output.addListener( 1468 new Runnable() { 1469 @Override 1470 public void run() { 1471 throw new MyError(); 1472 } 1473 }, 1474 directExecutor()); 1475 try { 1476 input.setException(new MyException()); 1477 fail(); 1478 } catch (MyError expected) { 1479 } 1480 } 1481 1482 public void testCatchingAsync_Throwable() throws Exception { 1483 AsyncFunction<Throwable, Integer> fallback = asyncFunctionReturningOne(); 1484 ListenableFuture<Integer> originalFuture = immediateFailedFuture(new IOException()); 1485 ListenableFuture<Integer> faultTolerantFuture = 1486 catchingAsync(originalFuture, Throwable.class, fallback, directExecutor()); 1487 assertEquals(1, (int) getDone(faultTolerantFuture)); 1488 } 1489 1490 @GwtIncompatible // non-Throwable exceptionType 1491 public void testCatchingAsync_customTypeMatch() throws Exception { 1492 AsyncFunction<IOException, Integer> fallback = asyncFunctionReturningOne(); 1493 ListenableFuture<Integer> originalFuture = immediateFailedFuture(new FileNotFoundException()); 1494 ListenableFuture<Integer> faultTolerantFuture = 1495 catchingAsync(originalFuture, IOException.class, fallback, directExecutor()); 1496 assertEquals(1, (int) getDone(faultTolerantFuture)); 1497 } 1498 1499 @GwtIncompatible // non-Throwable exceptionType 1500 public void testCatchingAsync_customTypeNoMatch() throws Exception { 1501 AsyncFunction<IOException, Integer> fallback = asyncFunctionReturningOne(); 1502 ListenableFuture<Integer> originalFuture = immediateFailedFuture(new RuntimeException()); 1503 ListenableFuture<Integer> faultTolerantFuture = 1504 catchingAsync(originalFuture, IOException.class, fallback, directExecutor()); 1505 try { 1506 getDone(faultTolerantFuture); 1507 fail(); 1508 } catch (ExecutionException expected) { 1509 assertThat(expected.getCause()).isInstanceOf(RuntimeException.class); 1510 } 1511 } 1512 1513 @GwtIncompatible // StackOverflowError 1514 public void testCatchingAsync_StackOverflow() throws Exception { 1515 { 1516 /* 1517 * Initialize all relevant classes before running the test, which may otherwise poison any 1518 * classes it is trying to load during its stack overflow. 1519 */ 1520 SettableFuture<Object> root = SettableFuture.create(); 1521 ListenableFuture<Object> unused = 1522 catchingAsync(root, MyException.class, asyncIdentity(), directExecutor()); 1523 root.setException(new MyException()); 1524 } 1525 1526 SettableFuture<Object> root = SettableFuture.create(); 1527 ListenableFuture<Object> output = root; 1528 for (int i = 0; i < 10000; i++) { 1529 output = catchingAsync(output, MyException.class, asyncIdentity(), directExecutor()); 1530 } 1531 try { 1532 root.setException(new MyException()); 1533 fail(); 1534 } catch (StackOverflowError expected) { 1535 } 1536 } 1537 1538 public void testCatchingAsync_ErrorAfterCancellation() throws Exception { 1539 class Fallback implements AsyncFunction<Throwable, Object> { 1540 ListenableFuture<Object> output; 1541 1542 @Override 1543 public ListenableFuture<Object> apply(Throwable input) { 1544 output.cancel(false); 1545 throw new MyError(); 1546 } 1547 } 1548 Fallback fallback = new Fallback(); 1549 SettableFuture<Object> input = SettableFuture.create(); 1550 1551 ListenableFuture<Object> output = 1552 catchingAsync(input, Throwable.class, fallback, directExecutor()); 1553 fallback.output = output; 1554 1555 input.setException(new MyException()); 1556 assertTrue(output.isCancelled()); 1557 } 1558 1559 public void testCatchingAsync_ExceptionAfterCancellation() throws Exception { 1560 class Fallback implements AsyncFunction<Throwable, Object> { 1561 ListenableFuture<Object> output; 1562 1563 @Override 1564 public ListenableFuture<Object> apply(Throwable input) { 1565 output.cancel(false); 1566 throw new MyRuntimeException(); 1567 } 1568 } 1569 Fallback fallback = new Fallback(); 1570 SettableFuture<Object> input = SettableFuture.create(); 1571 1572 ListenableFuture<Object> output = 1573 catchingAsync(input, Throwable.class, fallback, directExecutor()); 1574 fallback.output = output; 1575 1576 input.setException(new MyException()); 1577 assertTrue(output.isCancelled()); 1578 } 1579 1580 public void testCatchingAsync_getThrowsRuntimeException() throws Exception { 1581 ListenableFuture<Object> input = 1582 UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); 1583 1584 // We'd catch only MyRuntimeException.class here, but then the test won't compile under GWT. 1585 ListenableFuture<Object> output = 1586 catchingAsync(input, Throwable.class, asyncIdentity(), directExecutor()); 1587 assertThat(getDone(output)).isInstanceOf(MyRuntimeException.class); 1588 } 1589 1590 public void testCatchingAsync_getThrowsError() throws Exception { 1591 ListenableFuture<Object> input = UncheckedThrowingFuture.throwingError(new MyError()); 1592 1593 // We'd catch only MyError.class here, but then the test won't compile under GWT. 1594 ListenableFuture<Object> output = 1595 catchingAsync(input, Throwable.class, asyncIdentity(), directExecutor()); 1596 assertThat(getDone(output)).isInstanceOf(MyError.class); 1597 } 1598 1599 public void testCatchingAsync_listenerThrowsError() throws Exception { 1600 SettableFuture<Object> input = SettableFuture.create(); 1601 ListenableFuture<Object> output = 1602 catchingAsync(input, Throwable.class, asyncIdentity(), directExecutor()); 1603 1604 output.addListener( 1605 new Runnable() { 1606 @Override 1607 public void run() { 1608 throw new MyError(); 1609 } 1610 }, 1611 directExecutor()); 1612 try { 1613 input.setException(new MyException()); 1614 fail(); 1615 } catch (MyError expected) { 1616 } 1617 } 1618 1619 public void testCatching_rejectionPropagatesToOutput() throws Exception { 1620 SettableFuture<String> input = SettableFuture.create(); 1621 ListenableFuture<String> transformed = 1622 catching(input, Throwable.class, constant("foo"), REJECTING_EXECUTOR); 1623 input.setException(new Exception()); 1624 try { 1625 getDone(transformed); 1626 fail(); 1627 } catch (ExecutionException expected) { 1628 assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); 1629 } 1630 } 1631 1632 public void testCatchingAsync_rejectionPropagatesToOutput() throws Exception { 1633 SettableFuture<String> input = SettableFuture.create(); 1634 ListenableFuture<String> transformed = 1635 catchingAsync( 1636 input, 1637 Throwable.class, 1638 constantAsyncFunction(immediateFuture("foo")), 1639 REJECTING_EXECUTOR); 1640 input.setException(new Exception()); 1641 try { 1642 getDone(transformed); 1643 fail(); 1644 } catch (ExecutionException expected) { 1645 assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); 1646 } 1647 } 1648 1649 private <X extends Throwable> Function<X, Integer> functionReturningOne() { 1650 return new Function<X, Integer>() { 1651 @Override 1652 public Integer apply(X t) { 1653 return 1; 1654 } 1655 }; 1656 } 1657 1658 private <X extends Throwable> AsyncFunction<X, Integer> asyncFunctionReturningOne() { 1659 return new AsyncFunction<X, Integer>() { 1660 @Override 1661 public ListenableFuture<Integer> apply(X t) { 1662 return immediateFuture(1); 1663 } 1664 }; 1665 } 1666 1667 private static <I, O> AsyncFunction<I, O> constantAsyncFunction( 1668 final ListenableFuture<O> output) { 1669 return new AsyncFunction<I, O>() { 1670 @Override 1671 public ListenableFuture<O> apply(I input) { 1672 return output; 1673 } 1674 }; 1675 } 1676 1677 public void testTransformAsync_genericsWildcard_AsyncFunction() throws Exception { 1678 ListenableFuture<?> nullFuture = immediateFuture(null); 1679 ListenableFuture<?> chainedFuture = 1680 transformAsync(nullFuture, constantAsyncFunction(nullFuture), directExecutor()); 1681 assertNull(getDone(chainedFuture)); 1682 } 1683 1684 public void testTransformAsync_genericsHierarchy_AsyncFunction() throws Exception { 1685 ListenableFuture<FooChild> future = immediateFuture(null); 1686 final BarChild barChild = new BarChild(); 1687 AsyncFunction<Foo, BarChild> function = 1688 new AsyncFunction<Foo, BarChild>() { 1689 @Override 1690 public AbstractFuture<BarChild> apply(Foo unused) { 1691 AbstractFuture<BarChild> future = new AbstractFuture<BarChild>() {}; 1692 future.set(barChild); 1693 return future; 1694 } 1695 }; 1696 Bar bar = getDone(transformAsync(future, function, directExecutor())); 1697 assertSame(barChild, bar); 1698 } 1699 1700 @GwtIncompatible // get() timeout 1701 public void testTransformAsync_asyncFunction_timeout() 1702 throws InterruptedException, ExecutionException { 1703 AsyncFunction<String, Integer> function = constantAsyncFunction(immediateFuture(1)); 1704 ListenableFuture<Integer> future = 1705 transformAsync(SettableFuture.<String>create(), function, directExecutor()); 1706 try { 1707 future.get(1, MILLISECONDS); 1708 fail(); 1709 } catch (TimeoutException expected) { 1710 } 1711 } 1712 1713 public void testTransformAsync_asyncFunction_error() throws InterruptedException { 1714 final Error error = new Error("deliberate"); 1715 AsyncFunction<String, Integer> function = 1716 new AsyncFunction<String, Integer>() { 1717 @Override 1718 public ListenableFuture<Integer> apply(String input) { 1719 throw error; 1720 } 1721 }; 1722 SettableFuture<String> inputFuture = SettableFuture.create(); 1723 ListenableFuture<Integer> outputFuture = 1724 transformAsync(inputFuture, function, directExecutor()); 1725 inputFuture.set("value"); 1726 try { 1727 getDone(outputFuture); 1728 fail(); 1729 } catch (ExecutionException expected) { 1730 assertSame(error, expected.getCause()); 1731 } 1732 } 1733 1734 public void testTransformAsync_asyncFunction_nullInsteadOfFuture() throws Exception { 1735 ListenableFuture<?> inputFuture = immediateFuture("a"); 1736 ListenableFuture<?> chainedFuture = 1737 transformAsync(inputFuture, constantAsyncFunction(null), directExecutor()); 1738 try { 1739 getDone(chainedFuture); 1740 fail(); 1741 } catch (ExecutionException expected) { 1742 NullPointerException cause = (NullPointerException) expected.getCause(); 1743 assertThat(cause) 1744 .hasMessageThat() 1745 .contains( 1746 "AsyncFunction.apply returned null instead of a Future. " 1747 + "Did you mean to return immediateFuture(null)?"); 1748 } 1749 } 1750 1751 @GwtIncompatible // threads 1752 1753 public void testTransformAsync_asyncFunction_cancelledWhileApplyingFunction() 1754 throws InterruptedException, ExecutionException { 1755 final CountDownLatch inFunction = new CountDownLatch(1); 1756 final CountDownLatch functionDone = new CountDownLatch(1); 1757 final SettableFuture<Integer> resultFuture = SettableFuture.create(); 1758 AsyncFunction<String, Integer> function = 1759 new AsyncFunction<String, Integer>() { 1760 @Override 1761 public ListenableFuture<Integer> apply(String input) throws Exception { 1762 inFunction.countDown(); 1763 functionDone.await(); 1764 return resultFuture; 1765 } 1766 }; 1767 SettableFuture<String> inputFuture = SettableFuture.create(); 1768 ListenableFuture<Integer> future = 1769 transformAsync(inputFuture, function, newSingleThreadExecutor()); 1770 inputFuture.set("value"); 1771 inFunction.await(); 1772 future.cancel(false); 1773 functionDone.countDown(); 1774 try { 1775 future.get(); 1776 fail(); 1777 } catch (CancellationException expected) { 1778 } 1779 try { 1780 resultFuture.get(); 1781 fail(); 1782 } catch (CancellationException expected) { 1783 } 1784 } 1785 1786 @GwtIncompatible // threads 1787 1788 public void testTransformAsync_asyncFunction_cancelledBeforeApplyingFunction() 1789 throws InterruptedException { 1790 final AtomicBoolean functionCalled = new AtomicBoolean(); 1791 AsyncFunction<String, Integer> function = 1792 new AsyncFunction<String, Integer>() { 1793 @Override 1794 public ListenableFuture<Integer> apply(String input) throws Exception { 1795 functionCalled.set(true); 1796 return immediateFuture(1); 1797 } 1798 }; 1799 SettableFuture<String> inputFuture = SettableFuture.create(); 1800 ExecutorService executor = newSingleThreadExecutor(); 1801 ListenableFuture<Integer> future = transformAsync(inputFuture, function, executor); 1802 1803 // Pause the executor. 1804 final CountDownLatch beforeFunction = new CountDownLatch(1); 1805 executor.execute( 1806 new Runnable() { 1807 @Override 1808 public void run() { 1809 awaitUninterruptibly(beforeFunction); 1810 } 1811 }); 1812 1813 // Cancel the future after making input available. 1814 inputFuture.set("value"); 1815 future.cancel(false); 1816 1817 // Unpause the executor. 1818 beforeFunction.countDown(); 1819 executor.shutdown(); 1820 assertTrue(executor.awaitTermination(5, SECONDS)); 1821 1822 assertFalse(functionCalled.get()); 1823 } 1824 1825 public void testSubmitAsync_asyncCallable_error() throws InterruptedException { 1826 final Error error = new Error("deliberate"); 1827 AsyncCallable<Integer> callable = 1828 new AsyncCallable<Integer>() { 1829 @Override 1830 public ListenableFuture<Integer> call() { 1831 throw error; 1832 } 1833 }; 1834 SettableFuture<String> inputFuture = SettableFuture.create(); 1835 ListenableFuture<Integer> outputFuture = submitAsync(callable, directExecutor()); 1836 inputFuture.set("value"); 1837 try { 1838 getDone(outputFuture); 1839 fail(); 1840 } catch (ExecutionException expected) { 1841 assertSame(error, expected.getCause()); 1842 } 1843 } 1844 1845 public void testSubmitAsync_asyncCallable_nullInsteadOfFuture() throws Exception { 1846 ListenableFuture<?> chainedFuture = submitAsync(constantAsyncCallable(null), directExecutor()); 1847 try { 1848 getDone(chainedFuture); 1849 fail(); 1850 } catch (ExecutionException expected) { 1851 NullPointerException cause = (NullPointerException) expected.getCause(); 1852 assertThat(cause) 1853 .hasMessageThat() 1854 .contains( 1855 "AsyncCallable.call returned null instead of a Future. " 1856 + "Did you mean to return immediateFuture(null)?"); 1857 } 1858 } 1859 1860 @GwtIncompatible // threads 1861 1862 public void testSubmitAsync_asyncCallable_cancelledWhileApplyingFunction() 1863 throws InterruptedException, ExecutionException { 1864 final CountDownLatch inFunction = new CountDownLatch(1); 1865 final CountDownLatch callableDone = new CountDownLatch(1); 1866 final SettableFuture<Integer> resultFuture = SettableFuture.create(); 1867 AsyncCallable<Integer> callable = 1868 new AsyncCallable<Integer>() { 1869 @Override 1870 public ListenableFuture<Integer> call() throws InterruptedException { 1871 inFunction.countDown(); 1872 callableDone.await(); 1873 return resultFuture; 1874 } 1875 }; 1876 SettableFuture<String> inputFuture = SettableFuture.create(); 1877 ListenableFuture<Integer> future = submitAsync(callable, newSingleThreadExecutor()); 1878 inputFuture.set("value"); 1879 inFunction.await(); 1880 future.cancel(false); 1881 callableDone.countDown(); 1882 try { 1883 future.get(); 1884 fail(); 1885 } catch (CancellationException expected) { 1886 } 1887 try { 1888 resultFuture.get(); 1889 fail(); 1890 } catch (CancellationException expected) { 1891 } 1892 } 1893 1894 @GwtIncompatible // threads 1895 1896 public void testSubmitAsync_asyncCallable_cancelledBeforeApplyingFunction() 1897 throws InterruptedException { 1898 final AtomicBoolean callableCalled = new AtomicBoolean(); 1899 AsyncCallable<Integer> callable = 1900 new AsyncCallable<Integer>() { 1901 @Override 1902 public ListenableFuture<Integer> call() { 1903 callableCalled.set(true); 1904 return immediateFuture(1); 1905 } 1906 }; 1907 ExecutorService executor = newSingleThreadExecutor(); 1908 // Pause the executor. 1909 final CountDownLatch beforeFunction = new CountDownLatch(1); 1910 executor.execute( 1911 new Runnable() { 1912 @Override 1913 public void run() { 1914 awaitUninterruptibly(beforeFunction); 1915 } 1916 }); 1917 ListenableFuture<Integer> future = submitAsync(callable, executor); 1918 future.cancel(false); 1919 1920 // Unpause the executor. 1921 beforeFunction.countDown(); 1922 executor.shutdown(); 1923 assertTrue(executor.awaitTermination(5, SECONDS)); 1924 1925 assertFalse(callableCalled.get()); 1926 } 1927 1928 @GwtIncompatible // threads 1929 1930 public void testSubmitAsync_asyncCallable_returnsInterruptedFuture() throws InterruptedException { 1931 assertThat(Thread.interrupted()).isFalse(); 1932 SettableFuture<Integer> cancelledFuture = SettableFuture.create(); 1933 cancelledFuture.cancel(true); 1934 assertThat(Thread.interrupted()).isFalse(); 1935 ListenableFuture<Integer> future = 1936 submitAsync(constantAsyncCallable(cancelledFuture), directExecutor()); 1937 assertThat(future.isDone()).isTrue(); 1938 assertThat(Thread.interrupted()).isFalse(); 1939 } 1940 1941 public void testSubmit_callable_returnsValue() throws Exception { 1942 Callable<Integer> callable = 1943 new Callable<Integer>() { 1944 @Override 1945 public Integer call() { 1946 return 42; 1947 } 1948 }; 1949 ListenableFuture<Integer> future = submit(callable, directExecutor()); 1950 assertThat(future.isDone()).isTrue(); 1951 assertThat(getDone(future)).isEqualTo(42); 1952 } 1953 1954 public void testSubmit_callable_throwsException() { 1955 final Exception exception = new Exception("Exception for testing"); 1956 Callable<Integer> callable = 1957 new Callable<Integer>() { 1958 @Override 1959 public Integer call() throws Exception { 1960 throw exception; 1961 } 1962 }; 1963 ListenableFuture<Integer> future = submit(callable, directExecutor()); 1964 try { 1965 getDone(future); 1966 fail(); 1967 } catch (ExecutionException expected) { 1968 assertThat(expected).hasCauseThat().isSameInstanceAs(exception); 1969 } 1970 } 1971 1972 public void testSubmit_runnable_completesAfterRun() throws Exception { 1973 final List<Runnable> pendingRunnables = newArrayList(); 1974 final List<Runnable> executedRunnables = newArrayList(); 1975 Runnable runnable = 1976 new Runnable() { 1977 @Override 1978 public void run() { 1979 executedRunnables.add(this); 1980 } 1981 }; 1982 Executor executor = 1983 new Executor() { 1984 @Override 1985 public void execute(Runnable runnable) { 1986 pendingRunnables.add(runnable); 1987 } 1988 }; 1989 ListenableFuture<Void> future = submit(runnable, executor); 1990 assertThat(future.isDone()).isFalse(); 1991 assertThat(executedRunnables).isEmpty(); 1992 assertThat(pendingRunnables).hasSize(1); 1993 pendingRunnables.remove(0).run(); 1994 assertThat(future.isDone()).isTrue(); 1995 assertThat(executedRunnables).containsExactly(runnable); 1996 assertThat(pendingRunnables).isEmpty(); 1997 } 1998 1999 public void testSubmit_runnable_throwsException() throws Exception { 2000 final RuntimeException exception = new RuntimeException("Exception for testing"); 2001 Runnable runnable = 2002 new Runnable() { 2003 @Override 2004 public void run() { 2005 throw exception; 2006 } 2007 }; 2008 ListenableFuture<Void> future = submit(runnable, directExecutor()); 2009 try { 2010 getDone(future); 2011 fail(); 2012 } catch (ExecutionException expected) { 2013 assertThat(expected).hasCauseThat().isSameInstanceAs(exception); 2014 } 2015 } 2016 2017 @GwtIncompatible // threads 2018 2019 public void testScheduleAsync_asyncCallable_error() throws InterruptedException { 2020 final Error error = new Error("deliberate"); 2021 AsyncCallable<Integer> callable = 2022 new AsyncCallable<Integer>() { 2023 @Override 2024 public ListenableFuture<Integer> call() { 2025 throw error; 2026 } 2027 }; 2028 SettableFuture<String> inputFuture = SettableFuture.create(); 2029 ListenableFuture<Integer> outputFuture = submitAsync(callable, directExecutor()); 2030 inputFuture.set("value"); 2031 try { 2032 getDone(outputFuture); 2033 fail(); 2034 } catch (ExecutionException expected) { 2035 assertSame(error, expected.getCause()); 2036 } 2037 } 2038 2039 @GwtIncompatible // threads 2040 2041 public void testScheduleAsync_asyncCallable_nullInsteadOfFuture() throws Exception { 2042 ListenableFuture<?> chainedFuture = 2043 scheduleAsync( 2044 constantAsyncCallable(null), 2045 1, 2046 TimeUnit.NANOSECONDS, 2047 newSingleThreadScheduledExecutor()); 2048 try { 2049 chainedFuture.get(); 2050 fail(); 2051 } catch (ExecutionException expected) { 2052 NullPointerException cause = (NullPointerException) expected.getCause(); 2053 assertThat(cause) 2054 .hasMessageThat() 2055 .contains( 2056 "AsyncCallable.call returned null instead of a Future. " 2057 + "Did you mean to return immediateFuture(null)?"); 2058 } 2059 } 2060 2061 @GwtIncompatible // threads 2062 2063 public void testScheduleAsync_asyncCallable_cancelledWhileApplyingFunction() 2064 throws InterruptedException, ExecutionException { 2065 final CountDownLatch inFunction = new CountDownLatch(1); 2066 final CountDownLatch callableDone = new CountDownLatch(1); 2067 final SettableFuture<Integer> resultFuture = SettableFuture.create(); 2068 AsyncCallable<Integer> callable = 2069 new AsyncCallable<Integer>() { 2070 @Override 2071 public ListenableFuture<Integer> call() throws InterruptedException { 2072 inFunction.countDown(); 2073 callableDone.await(); 2074 return resultFuture; 2075 } 2076 }; 2077 ListenableFuture<Integer> future = 2078 scheduleAsync(callable, 1, TimeUnit.NANOSECONDS, newSingleThreadScheduledExecutor()); 2079 inFunction.await(); 2080 future.cancel(false); 2081 callableDone.countDown(); 2082 try { 2083 future.get(); 2084 fail(); 2085 } catch (CancellationException expected) { 2086 } 2087 try { 2088 resultFuture.get(); 2089 fail(); 2090 } catch (CancellationException expected) { 2091 } 2092 } 2093 2094 @GwtIncompatible // threads 2095 2096 public void testScheduleAsync_asyncCallable_cancelledBeforeCallingFunction() 2097 throws InterruptedException { 2098 final AtomicBoolean callableCalled = new AtomicBoolean(); 2099 AsyncCallable<Integer> callable = 2100 new AsyncCallable<Integer>() { 2101 @Override 2102 public ListenableFuture<Integer> call() { 2103 callableCalled.set(true); 2104 return immediateFuture(1); 2105 } 2106 }; 2107 ScheduledExecutorService executor = newSingleThreadScheduledExecutor(); 2108 // Pause the executor. 2109 final CountDownLatch beforeFunction = new CountDownLatch(1); 2110 executor.execute( 2111 new Runnable() { 2112 @Override 2113 public void run() { 2114 awaitUninterruptibly(beforeFunction); 2115 } 2116 }); 2117 ListenableFuture<Integer> future = scheduleAsync(callable, 1, TimeUnit.NANOSECONDS, executor); 2118 future.cancel(false); 2119 2120 // Unpause the executor. 2121 beforeFunction.countDown(); 2122 executor.shutdown(); 2123 assertTrue(executor.awaitTermination(5, SECONDS)); 2124 2125 assertFalse(callableCalled.get()); 2126 } 2127 2128 private static <T> AsyncCallable<T> constantAsyncCallable(final ListenableFuture<T> returnValue) { 2129 return new AsyncCallable<T>() { 2130 @Override 2131 public ListenableFuture<T> call() { 2132 return returnValue; 2133 } 2134 }; 2135 } 2136 2137 /** Runnable which can be called a single time, and only after {@link #expectCall} is called. */ 2138 // TODO(cpovirk): top-level class? 2139 private static class SingleCallListener implements Runnable { 2140 2141 private boolean expectCall = false; 2142 private final AtomicBoolean called = new AtomicBoolean(); 2143 2144 @Override 2145 public void run() { 2146 assertTrue("Listener called before it was expected", expectCall); 2147 assertFalse("Listener called more than once", wasCalled()); 2148 called.set(true); 2149 } 2150 2151 public void expectCall() { 2152 assertFalse("expectCall is already true", expectCall); 2153 expectCall = true; 2154 } 2155 2156 public boolean wasCalled() { 2157 return called.get(); 2158 } 2159 } 2160 2161 public void testAllAsList() throws Exception { 2162 // Create input and output 2163 SettableFuture<String> future1 = SettableFuture.create(); 2164 SettableFuture<String> future2 = SettableFuture.create(); 2165 SettableFuture<String> future3 = SettableFuture.create(); 2166 @SuppressWarnings("unchecked") // array is never modified 2167 ListenableFuture<List<String>> compound = allAsList(future1, future2, future3); 2168 2169 // Attach a listener 2170 SingleCallListener listener = new SingleCallListener(); 2171 compound.addListener(listener, directExecutor()); 2172 2173 // Satisfy each input and check the output 2174 assertFalse(compound.isDone()); 2175 future1.set(DATA1); 2176 assertFalse(compound.isDone()); 2177 future2.set(DATA2); 2178 assertFalse(compound.isDone()); 2179 listener.expectCall(); 2180 future3.set(DATA3); 2181 assertTrue(listener.wasCalled()); 2182 2183 List<String> results = getDone(compound); 2184 assertThat(results).containsExactly(DATA1, DATA2, DATA3).inOrder(); 2185 } 2186 2187 public void testAllAsList_emptyList() throws Exception { 2188 SingleCallListener listener = new SingleCallListener(); 2189 listener.expectCall(); 2190 List<ListenableFuture<String>> futures = ImmutableList.of(); 2191 ListenableFuture<List<String>> compound = allAsList(futures); 2192 compound.addListener(listener, directExecutor()); 2193 assertThat(getDone(compound)).isEmpty(); 2194 assertTrue(listener.wasCalled()); 2195 } 2196 2197 public void testAllAsList_emptyArray() throws Exception { 2198 SingleCallListener listener = new SingleCallListener(); 2199 listener.expectCall(); 2200 @SuppressWarnings("unchecked") // array is never modified 2201 ListenableFuture<List<String>> compound = allAsList(); 2202 compound.addListener(listener, directExecutor()); 2203 assertThat(getDone(compound)).isEmpty(); 2204 assertTrue(listener.wasCalled()); 2205 } 2206 2207 public void testAllAsList_failure() throws Exception { 2208 SingleCallListener listener = new SingleCallListener(); 2209 SettableFuture<String> future1 = SettableFuture.create(); 2210 SettableFuture<String> future2 = SettableFuture.create(); 2211 @SuppressWarnings("unchecked") // array is never modified 2212 ListenableFuture<List<String>> compound = allAsList(future1, future2); 2213 compound.addListener(listener, directExecutor()); 2214 2215 listener.expectCall(); 2216 Throwable exception = new Throwable("failed1"); 2217 future1.setException(exception); 2218 assertTrue(compound.isDone()); 2219 assertTrue(listener.wasCalled()); 2220 assertFalse(future2.isDone()); 2221 2222 try { 2223 getDone(compound); 2224 fail(); 2225 } catch (ExecutionException expected) { 2226 assertSame(exception, expected.getCause()); 2227 } 2228 } 2229 2230 public void testAllAsList_singleFailure() throws Exception { 2231 Throwable exception = new Throwable("failed"); 2232 ListenableFuture<String> future = immediateFailedFuture(exception); 2233 ListenableFuture<List<String>> compound = allAsList(ImmutableList.of(future)); 2234 2235 try { 2236 getDone(compound); 2237 fail(); 2238 } catch (ExecutionException expected) { 2239 assertSame(exception, expected.getCause()); 2240 } 2241 } 2242 2243 public void testAllAsList_immediateFailure() throws Exception { 2244 Throwable exception = new Throwable("failed"); 2245 ListenableFuture<String> future1 = immediateFailedFuture(exception); 2246 ListenableFuture<String> future2 = immediateFuture("results"); 2247 ListenableFuture<List<String>> compound = allAsList(ImmutableList.of(future1, future2)); 2248 2249 try { 2250 getDone(compound); 2251 fail(); 2252 } catch (ExecutionException expected) { 2253 assertSame(exception, expected.getCause()); 2254 } 2255 } 2256 2257 public void testAllAsList_error() throws Exception { 2258 Error error = new Error("deliberate"); 2259 SettableFuture<String> future1 = SettableFuture.create(); 2260 ListenableFuture<String> future2 = immediateFuture("results"); 2261 ListenableFuture<List<String>> compound = allAsList(ImmutableList.of(future1, future2)); 2262 2263 future1.setException(error); 2264 try { 2265 getDone(compound); 2266 fail(); 2267 } catch (ExecutionException expected) { 2268 assertSame(error, expected.getCause()); 2269 } 2270 } 2271 2272 public void testAllAsList_cancelled() throws Exception { 2273 SingleCallListener listener = new SingleCallListener(); 2274 SettableFuture<String> future1 = SettableFuture.create(); 2275 SettableFuture<String> future2 = SettableFuture.create(); 2276 @SuppressWarnings("unchecked") // array is never modified 2277 ListenableFuture<List<String>> compound = allAsList(future1, future2); 2278 compound.addListener(listener, directExecutor()); 2279 2280 listener.expectCall(); 2281 future1.cancel(true); 2282 assertTrue(compound.isDone()); 2283 assertTrue(listener.wasCalled()); 2284 assertFalse(future2.isDone()); 2285 2286 try { 2287 getDone(compound); 2288 fail(); 2289 } catch (CancellationException expected) { 2290 } 2291 } 2292 2293 public void testAllAsList_resultCancelled() throws Exception { 2294 SettableFuture<String> future1 = SettableFuture.create(); 2295 SettableFuture<String> future2 = SettableFuture.create(); 2296 @SuppressWarnings("unchecked") // array is never modified 2297 ListenableFuture<List<String>> compound = allAsList(future1, future2); 2298 2299 future2.set(DATA2); 2300 assertFalse(compound.isDone()); 2301 assertTrue(compound.cancel(false)); 2302 assertTrue(compound.isCancelled()); 2303 assertTrue(future1.isCancelled()); 2304 assertFalse(future1.wasInterrupted()); 2305 } 2306 2307 public void testAllAsList_resultCancelledInterrupted_withSecondaryListFuture() throws Exception { 2308 SettableFuture<String> future1 = SettableFuture.create(); 2309 SettableFuture<String> future2 = SettableFuture.create(); 2310 ListenableFuture<List<String>> compound = allAsList(future1, future2); 2311 // There was a bug where the event listener for the combined future would 2312 // result in the sub-futures being cancelled without being interrupted. 2313 ListenableFuture<List<String>> otherCompound = allAsList(future1, future2); 2314 2315 assertTrue(compound.cancel(true)); 2316 assertTrue(future1.isCancelled()); 2317 assertTrue(future1.wasInterrupted()); 2318 assertTrue(future2.isCancelled()); 2319 assertTrue(future2.wasInterrupted()); 2320 assertTrue(otherCompound.isCancelled()); 2321 } 2322 2323 public void testAllAsList_resultCancelled_withSecondaryListFuture() throws Exception { 2324 SettableFuture<String> future1 = SettableFuture.create(); 2325 SettableFuture<String> future2 = SettableFuture.create(); 2326 ListenableFuture<List<String>> compound = allAsList(future1, future2); 2327 // This next call is "unused," but it is an important part of the test. Don't remove it! 2328 ListenableFuture<List<String>> unused = allAsList(future1, future2); 2329 2330 assertTrue(compound.cancel(false)); 2331 assertTrue(future1.isCancelled()); 2332 assertFalse(future1.wasInterrupted()); 2333 assertTrue(future2.isCancelled()); 2334 assertFalse(future2.wasInterrupted()); 2335 } 2336 2337 public void testAllAsList_resultInterrupted() throws Exception { 2338 SettableFuture<String> future1 = SettableFuture.create(); 2339 SettableFuture<String> future2 = SettableFuture.create(); 2340 @SuppressWarnings("unchecked") // array is never modified 2341 ListenableFuture<List<String>> compound = allAsList(future1, future2); 2342 2343 future2.set(DATA2); 2344 assertFalse(compound.isDone()); 2345 assertTrue(compound.cancel(true)); 2346 assertTrue(compound.isCancelled()); 2347 assertTrue(future1.isCancelled()); 2348 assertTrue(future1.wasInterrupted()); 2349 } 2350 2351 /** 2352 * Test the case where the futures are fulfilled prior to constructing the ListFuture. There was a 2353 * bug where the loop that connects a Listener to each of the futures would die on the last 2354 * loop-check as done() on ListFuture nulled out the variable being looped over (the list of 2355 * futures). 2356 */ 2357 public void testAllAsList_doneFutures() throws Exception { 2358 // Create input and output 2359 SettableFuture<String> future1 = SettableFuture.create(); 2360 SettableFuture<String> future2 = SettableFuture.create(); 2361 SettableFuture<String> future3 = SettableFuture.create(); 2362 2363 // Satisfy each input prior to creating compound and check the output 2364 future1.set(DATA1); 2365 future2.set(DATA2); 2366 future3.set(DATA3); 2367 2368 @SuppressWarnings("unchecked") // array is never modified 2369 ListenableFuture<List<String>> compound = allAsList(future1, future2, future3); 2370 2371 // Attach a listener 2372 SingleCallListener listener = new SingleCallListener(); 2373 listener.expectCall(); 2374 compound.addListener(listener, directExecutor()); 2375 2376 assertTrue(listener.wasCalled()); 2377 2378 List<String> results = getDone(compound); 2379 assertThat(results).containsExactly(DATA1, DATA2, DATA3).inOrder(); 2380 } 2381 2382 /** A single non-error failure is not logged because it is reported via the output future. */ 2383 @SuppressWarnings("unchecked") 2384 public void testAllAsList_logging_exception() throws Exception { 2385 try { 2386 getDone(allAsList(immediateFailedFuture(new MyException()))); 2387 fail(); 2388 } catch (ExecutionException expected) { 2389 assertThat(expected.getCause()).isInstanceOf(MyException.class); 2390 assertEquals( 2391 "Nothing should be logged", 0, aggregateFutureLogHandler.getStoredLogRecords().size()); 2392 } 2393 } 2394 2395 /** Ensure that errors are always logged. */ 2396 @SuppressWarnings("unchecked") 2397 public void testAllAsList_logging_error() throws Exception { 2398 try { 2399 getDone(allAsList(immediateFailedFuture(new MyError()))); 2400 fail(); 2401 } catch (ExecutionException expected) { 2402 assertThat(expected.getCause()).isInstanceOf(MyError.class); 2403 List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords(); 2404 assertThat(logged).hasSize(1); // errors are always logged 2405 assertThat(logged.get(0).getThrown()).isInstanceOf(MyError.class); 2406 } 2407 } 2408 2409 /** All as list will log extra exceptions that have already occurred. */ 2410 @SuppressWarnings("unchecked") 2411 public void testAllAsList_logging_multipleExceptions_alreadyDone() throws Exception { 2412 try { 2413 getDone( 2414 allAsList( 2415 immediateFailedFuture(new MyException()), immediateFailedFuture(new MyException()))); 2416 fail(); 2417 } catch (ExecutionException expected) { 2418 assertThat(expected.getCause()).isInstanceOf(MyException.class); 2419 List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords(); 2420 assertThat(logged).hasSize(1); // the second failure is logged 2421 assertThat(logged.get(0).getThrown()).isInstanceOf(MyException.class); 2422 } 2423 } 2424 2425 /** All as list will log extra exceptions that occur later. */ 2426 @SuppressWarnings("unchecked") 2427 public void testAllAsList_logging_multipleExceptions_doneLater() throws Exception { 2428 SettableFuture<Object> future1 = SettableFuture.create(); 2429 SettableFuture<Object> future2 = SettableFuture.create(); 2430 SettableFuture<Object> future3 = SettableFuture.create(); 2431 ListenableFuture<List<Object>> all = allAsList(future1, future2, future3); 2432 2433 future1.setException(new MyException()); 2434 future2.setException(new MyException()); 2435 future3.setException(new MyException()); 2436 2437 try { 2438 getDone(all); 2439 fail(); 2440 } catch (ExecutionException expected) { 2441 List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords(); 2442 assertThat(logged).hasSize(2); // failures after the first are logged 2443 assertThat(logged.get(0).getThrown()).isInstanceOf(MyException.class); 2444 assertThat(logged.get(1).getThrown()).isInstanceOf(MyException.class); 2445 } 2446 } 2447 2448 /** The same exception happening on multiple futures should not be logged. */ 2449 @SuppressWarnings("unchecked") 2450 public void testAllAsList_logging_same_exception() throws Exception { 2451 try { 2452 MyException sameInstance = new MyException(); 2453 getDone(allAsList(immediateFailedFuture(sameInstance), immediateFailedFuture(sameInstance))); 2454 fail(); 2455 } catch (ExecutionException expected) { 2456 assertThat(expected.getCause()).isInstanceOf(MyException.class); 2457 assertEquals( 2458 "Nothing should be logged", 0, aggregateFutureLogHandler.getStoredLogRecords().size()); 2459 } 2460 } 2461 2462 public void testAllAsList_logging_seenExceptionUpdateRace() throws Exception { 2463 final MyException sameInstance = new MyException(); 2464 SettableFuture<Object> firstFuture = SettableFuture.create(); 2465 final SettableFuture<Object> secondFuture = SettableFuture.create(); 2466 ListenableFuture<List<Object>> bulkFuture = allAsList(firstFuture, secondFuture); 2467 2468 bulkFuture.addListener( 2469 new Runnable() { 2470 @Override 2471 public void run() { 2472 /* 2473 * firstFuture just completed, but AggregateFuture hasn't yet had time to record the 2474 * exception in seenExceptions. When we complete secondFuture with the same exception, 2475 * we want for AggregateFuture to still detect that it's been previously seen. 2476 */ 2477 secondFuture.setException(sameInstance); 2478 } 2479 }, 2480 directExecutor()); 2481 firstFuture.setException(sameInstance); 2482 2483 try { 2484 getDone(bulkFuture); 2485 fail(); 2486 } catch (ExecutionException expected) { 2487 assertThat(expected.getCause()).isInstanceOf(MyException.class); 2488 assertThat(aggregateFutureLogHandler.getStoredLogRecords()).isEmpty(); 2489 } 2490 } 2491 2492 public void testAllAsList_logging_seenExceptionUpdateCancelRace() throws Exception { 2493 final MyException subsequentFailure = new MyException(); 2494 SettableFuture<Object> firstFuture = SettableFuture.create(); 2495 final SettableFuture<Object> secondFuture = SettableFuture.create(); 2496 ListenableFuture<List<Object>> bulkFuture = allAsList(firstFuture, secondFuture); 2497 2498 bulkFuture.addListener( 2499 new Runnable() { 2500 @Override 2501 public void run() { 2502 /* 2503 * This is similar to the above test, but this time we're making sure that we recognize 2504 * that the output Future is done early not because of an exception but because of a 2505 * cancellation. 2506 */ 2507 secondFuture.setException(subsequentFailure); 2508 } 2509 }, 2510 directExecutor()); 2511 firstFuture.cancel(false); 2512 2513 try { 2514 getDone(bulkFuture); 2515 fail(); 2516 } catch (CancellationException expected) { 2517 assertThat(getOnlyElement(aggregateFutureLogHandler.getStoredLogRecords()).getThrown()) 2518 .isSameInstanceAs(subsequentFailure); 2519 } 2520 } 2521 2522 /** 2523 * Different exceptions happening on multiple futures with the same cause should not be logged. 2524 */ 2525 @SuppressWarnings("unchecked") 2526 public void testAllAsList_logging_same_cause() throws Exception { 2527 try { 2528 MyException exception1 = new MyException(); 2529 MyException exception2 = new MyException(); 2530 MyException exception3 = new MyException(); 2531 2532 MyException sameInstance = new MyException(); 2533 exception1.initCause(sameInstance); 2534 exception2.initCause(sameInstance); 2535 exception3.initCause(exception2); 2536 getDone(allAsList(immediateFailedFuture(exception1), immediateFailedFuture(exception3))); 2537 fail(); 2538 } catch (ExecutionException expected) { 2539 assertThat(expected.getCause()).isInstanceOf(MyException.class); 2540 assertEquals( 2541 "Nothing should be logged", 0, aggregateFutureLogHandler.getStoredLogRecords().size()); 2542 } 2543 } 2544 2545 private static String createCombinedResult(Integer i, Boolean b) { 2546 return "-" + i + "-" + b; 2547 } 2548 2549 @GwtIncompatible // threads 2550 2551 public void testWhenAllComplete_noLeakInterruption() throws Exception { 2552 final SettableFuture<String> stringFuture = SettableFuture.create(); 2553 AsyncCallable<String> combiner = 2554 new AsyncCallable<String>() { 2555 @Override 2556 public ListenableFuture<String> call() throws Exception { 2557 return stringFuture; 2558 } 2559 }; 2560 2561 ListenableFuture<String> futureResult = whenAllComplete().callAsync(combiner, directExecutor()); 2562 2563 assertThat(Thread.interrupted()).isFalse(); 2564 futureResult.cancel(true); 2565 assertThat(Thread.interrupted()).isFalse(); 2566 } 2567 2568 public void testWhenAllComplete_wildcard() throws Exception { 2569 ListenableFuture<?> futureA = immediateFuture("a"); 2570 ListenableFuture<?> futureB = immediateFuture("b"); 2571 ListenableFuture<?>[] futures = new ListenableFuture<?>[0]; 2572 Callable<String> combiner = 2573 new Callable<String>() { 2574 @Override 2575 public String call() throws Exception { 2576 return "hi"; 2577 } 2578 }; 2579 2580 // We'd like for all the following to compile. 2581 ListenableFuture<String> unused; 2582 2583 // Compiles: 2584 unused = whenAllComplete(futureA, futureB).call(combiner, directExecutor()); 2585 2586 // Does not compile: 2587 // unused = whenAllComplete(futures).call(combiner); 2588 2589 // Workaround for the above: 2590 unused = whenAllComplete(asList(futures)).call(combiner, directExecutor()); 2591 } 2592 2593 @GwtIncompatible // threads 2594 2595 public void testWhenAllComplete_asyncResult() throws Exception { 2596 SettableFuture<Integer> futureInteger = SettableFuture.create(); 2597 SettableFuture<Boolean> futureBoolean = SettableFuture.create(); 2598 2599 final ExecutorService executor = newSingleThreadExecutor(); 2600 final CountDownLatch callableBlocking = new CountDownLatch(1); 2601 final SettableFuture<String> resultOfCombiner = SettableFuture.create(); 2602 AsyncCallable<String> combiner = 2603 tagged( 2604 "Called my toString", 2605 new AsyncCallable<String>() { 2606 @Override 2607 public ListenableFuture<String> call() throws Exception { 2608 // Make this executor terminate after this task so that the test can tell when 2609 // futureResult has received resultOfCombiner. 2610 executor.shutdown(); 2611 callableBlocking.await(); 2612 return resultOfCombiner; 2613 } 2614 }); 2615 2616 ListenableFuture<String> futureResult = 2617 whenAllComplete(futureInteger, futureBoolean).callAsync(combiner, executor); 2618 2619 // Waiting on backing futures 2620 assertThat(futureResult.toString()) 2621 .matches( 2622 "CombinedFuture@\\w+\\[status=PENDING," 2623 + " info=\\[futures=\\[SettableFuture@\\w+\\[status=PENDING]," 2624 + " SettableFuture@\\w+\\[status=PENDING]]]]"); 2625 Integer integerPartial = 1; 2626 futureInteger.set(integerPartial); 2627 assertThat(futureResult.toString()) 2628 .matches( 2629 "CombinedFuture@\\w+\\[status=PENDING," 2630 + " info=\\[futures=\\[SettableFuture@\\w+\\[status=SUCCESS," 2631 + " result=\\[java.lang.Integer@\\w+]], SettableFuture@\\w+\\[status=PENDING]]]]"); 2632 2633 // Backing futures complete 2634 Boolean booleanPartial = true; 2635 futureBoolean.set(booleanPartial); 2636 // Once the backing futures are done there's a (brief) moment where we know nothing 2637 assertThat(futureResult.toString()).matches("CombinedFuture@\\w+\\[status=PENDING]"); 2638 callableBlocking.countDown(); 2639 // Need to wait for resultFuture to be returned. 2640 assertTrue(executor.awaitTermination(10, SECONDS)); 2641 // But once the async function has returned a future we can include that in the toString 2642 assertThat(futureResult.toString()) 2643 .matches( 2644 "CombinedFuture@\\w+\\[status=PENDING," 2645 + " setFuture=\\[SettableFuture@\\w+\\[status=PENDING]]]"); 2646 2647 // Future complete 2648 resultOfCombiner.set(createCombinedResult(getDone(futureInteger), getDone(futureBoolean))); 2649 String expectedResult = createCombinedResult(integerPartial, booleanPartial); 2650 assertEquals(expectedResult, futureResult.get()); 2651 assertThat(futureResult.toString()) 2652 .matches("CombinedFuture@\\w+\\[status=SUCCESS, result=\\[java.lang.String@\\w+]]"); 2653 } 2654 2655 public void testWhenAllComplete_asyncError() throws Exception { 2656 final Exception thrown = new RuntimeException("test"); 2657 2658 final SettableFuture<Integer> futureInteger = SettableFuture.create(); 2659 final SettableFuture<Boolean> futureBoolean = SettableFuture.create(); 2660 AsyncCallable<String> combiner = 2661 new AsyncCallable<String>() { 2662 @Override 2663 public ListenableFuture<String> call() throws Exception { 2664 assertTrue(futureInteger.isDone()); 2665 assertTrue(futureBoolean.isDone()); 2666 return immediateFailedFuture(thrown); 2667 } 2668 }; 2669 2670 ListenableFuture<String> futureResult = 2671 whenAllComplete(futureInteger, futureBoolean).callAsync(combiner, directExecutor()); 2672 Integer integerPartial = 1; 2673 futureInteger.set(integerPartial); 2674 Boolean booleanPartial = true; 2675 futureBoolean.set(booleanPartial); 2676 2677 try { 2678 getDone(futureResult); 2679 fail(); 2680 } catch (ExecutionException expected) { 2681 assertSame(thrown, expected.getCause()); 2682 } 2683 } 2684 2685 @GwtIncompatible // threads 2686 2687 public void testWhenAllComplete_cancelledNotInterrupted() throws Exception { 2688 SettableFuture<String> stringFuture = SettableFuture.create(); 2689 SettableFuture<Boolean> booleanFuture = SettableFuture.create(); 2690 final CountDownLatch inFunction = new CountDownLatch(1); 2691 final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); 2692 final SettableFuture<String> resultFuture = SettableFuture.create(); 2693 AsyncCallable<String> combiner = 2694 new AsyncCallable<String>() { 2695 @Override 2696 public ListenableFuture<String> call() throws Exception { 2697 inFunction.countDown(); 2698 shouldCompleteFunction.await(); 2699 return resultFuture; 2700 } 2701 }; 2702 2703 ListenableFuture<String> futureResult = 2704 whenAllComplete(stringFuture, booleanFuture).callAsync(combiner, newSingleThreadExecutor()); 2705 2706 stringFuture.set("value"); 2707 booleanFuture.set(true); 2708 inFunction.await(); 2709 futureResult.cancel(false); 2710 shouldCompleteFunction.countDown(); 2711 try { 2712 futureResult.get(); 2713 fail(); 2714 } catch (CancellationException expected) { 2715 } 2716 2717 try { 2718 resultFuture.get(); 2719 fail(); 2720 } catch (CancellationException expected) { 2721 } 2722 } 2723 2724 @GwtIncompatible // threads 2725 2726 public void testWhenAllComplete_interrupted() throws Exception { 2727 SettableFuture<String> stringFuture = SettableFuture.create(); 2728 SettableFuture<Boolean> booleanFuture = SettableFuture.create(); 2729 final CountDownLatch inFunction = new CountDownLatch(1); 2730 final CountDownLatch gotException = new CountDownLatch(1); 2731 AsyncCallable<String> combiner = 2732 new AsyncCallable<String>() { 2733 @Override 2734 public ListenableFuture<String> call() throws Exception { 2735 inFunction.countDown(); 2736 try { 2737 new CountDownLatch(1).await(); // wait for interrupt 2738 } catch (InterruptedException expected) { 2739 gotException.countDown(); 2740 throw expected; 2741 } 2742 return immediateFuture("a"); 2743 } 2744 }; 2745 2746 ListenableFuture<String> futureResult = 2747 whenAllComplete(stringFuture, booleanFuture).callAsync(combiner, newSingleThreadExecutor()); 2748 2749 stringFuture.set("value"); 2750 booleanFuture.set(true); 2751 inFunction.await(); 2752 futureResult.cancel(true); 2753 try { 2754 futureResult.get(); 2755 fail(); 2756 } catch (CancellationException expected) { 2757 } 2758 gotException.await(); 2759 } 2760 2761 public void testWhenAllComplete_runnableResult() throws Exception { 2762 final SettableFuture<Integer> futureInteger = SettableFuture.create(); 2763 final SettableFuture<Boolean> futureBoolean = SettableFuture.create(); 2764 final String[] result = new String[1]; 2765 Runnable combiner = 2766 new Runnable() { 2767 @Override 2768 public void run() { 2769 assertTrue(futureInteger.isDone()); 2770 assertTrue(futureBoolean.isDone()); 2771 result[0] = 2772 createCombinedResult( 2773 Futures.getUnchecked(futureInteger), Futures.getUnchecked(futureBoolean)); 2774 } 2775 }; 2776 2777 ListenableFuture<?> futureResult = 2778 whenAllComplete(futureInteger, futureBoolean).run(combiner, directExecutor()); 2779 Integer integerPartial = 1; 2780 futureInteger.set(integerPartial); 2781 Boolean booleanPartial = true; 2782 futureBoolean.set(booleanPartial); 2783 futureResult.get(); 2784 assertEquals(createCombinedResult(integerPartial, booleanPartial), result[0]); 2785 } 2786 2787 public void testWhenAllComplete_runnableError() throws Exception { 2788 final RuntimeException thrown = new RuntimeException("test"); 2789 2790 final SettableFuture<Integer> futureInteger = SettableFuture.create(); 2791 final SettableFuture<Boolean> futureBoolean = SettableFuture.create(); 2792 Runnable combiner = 2793 new Runnable() { 2794 @Override 2795 public void run() { 2796 assertTrue(futureInteger.isDone()); 2797 assertTrue(futureBoolean.isDone()); 2798 throw thrown; 2799 } 2800 }; 2801 2802 ListenableFuture<?> futureResult = 2803 whenAllComplete(futureInteger, futureBoolean).run(combiner, directExecutor()); 2804 Integer integerPartial = 1; 2805 futureInteger.set(integerPartial); 2806 Boolean booleanPartial = true; 2807 futureBoolean.set(booleanPartial); 2808 2809 try { 2810 getDone(futureResult); 2811 fail(); 2812 } catch (ExecutionException expected) { 2813 assertSame(thrown, expected.getCause()); 2814 } 2815 } 2816 2817 @GwtIncompatible // threads 2818 2819 public void testWhenAllCompleteRunnable_resultCanceledWithoutInterrupt_doesNotInterruptRunnable() 2820 throws Exception { 2821 SettableFuture<String> stringFuture = SettableFuture.create(); 2822 SettableFuture<Boolean> booleanFuture = SettableFuture.create(); 2823 final CountDownLatch inFunction = new CountDownLatch(1); 2824 final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); 2825 final CountDownLatch combinerCompletedWithoutInterrupt = new CountDownLatch(1); 2826 Runnable combiner = 2827 new Runnable() { 2828 @Override 2829 public void run() { 2830 inFunction.countDown(); 2831 try { 2832 shouldCompleteFunction.await(); 2833 combinerCompletedWithoutInterrupt.countDown(); 2834 } catch (InterruptedException e) { 2835 // Ensure the thread's interrupt status is preserved. 2836 Thread.currentThread().interrupt(); 2837 throw new RuntimeException(e); 2838 } 2839 } 2840 }; 2841 2842 ListenableFuture<?> futureResult = 2843 whenAllComplete(stringFuture, booleanFuture).run(combiner, newSingleThreadExecutor()); 2844 2845 stringFuture.set("value"); 2846 booleanFuture.set(true); 2847 inFunction.await(); 2848 futureResult.cancel(false); 2849 shouldCompleteFunction.countDown(); 2850 try { 2851 futureResult.get(); 2852 fail(); 2853 } catch (CancellationException expected) { 2854 } 2855 combinerCompletedWithoutInterrupt.await(); 2856 } 2857 2858 @GwtIncompatible // threads 2859 2860 public void testWhenAllCompleteRunnable_resultCanceledWithInterrupt_InterruptsRunnable() 2861 throws Exception { 2862 SettableFuture<String> stringFuture = SettableFuture.create(); 2863 SettableFuture<Boolean> booleanFuture = SettableFuture.create(); 2864 final CountDownLatch inFunction = new CountDownLatch(1); 2865 final CountDownLatch gotException = new CountDownLatch(1); 2866 Runnable combiner = 2867 new Runnable() { 2868 @Override 2869 public void run() { 2870 inFunction.countDown(); 2871 try { 2872 new CountDownLatch(1).await(); // wait for interrupt 2873 } catch (InterruptedException expected) { 2874 // Ensure the thread's interrupt status is preserved. 2875 Thread.currentThread().interrupt(); 2876 gotException.countDown(); 2877 } 2878 } 2879 }; 2880 2881 ListenableFuture<?> futureResult = 2882 whenAllComplete(stringFuture, booleanFuture).run(combiner, newSingleThreadExecutor()); 2883 2884 stringFuture.set("value"); 2885 booleanFuture.set(true); 2886 inFunction.await(); 2887 futureResult.cancel(true); 2888 try { 2889 futureResult.get(); 2890 fail(); 2891 } catch (CancellationException expected) { 2892 } 2893 gotException.await(); 2894 } 2895 2896 public void testWhenAllSucceed() throws Exception { 2897 class PartialResultException extends Exception {} 2898 2899 final SettableFuture<Integer> futureInteger = SettableFuture.create(); 2900 final SettableFuture<Boolean> futureBoolean = SettableFuture.create(); 2901 AsyncCallable<String> combiner = 2902 new AsyncCallable<String>() { 2903 @Override 2904 public ListenableFuture<String> call() throws Exception { 2905 throw new AssertionFailedError("AsyncCallable should not have been called."); 2906 } 2907 }; 2908 2909 ListenableFuture<String> futureResult = 2910 whenAllSucceed(futureInteger, futureBoolean).callAsync(combiner, directExecutor()); 2911 PartialResultException partialResultException = new PartialResultException(); 2912 futureInteger.setException(partialResultException); 2913 Boolean booleanPartial = true; 2914 futureBoolean.set(booleanPartial); 2915 try { 2916 getDone(futureResult); 2917 fail(); 2918 } catch (ExecutionException expected) { 2919 assertSame(partialResultException, expected.getCause()); 2920 } 2921 } 2922 2923 @AndroidIncompatible 2924 @GwtIncompatible 2925 public void testWhenAllSucceed_releasesInputFuturesUponSubmission() throws Exception { 2926 SettableFuture<Long> future1 = SettableFuture.create(); 2927 SettableFuture<Long> future2 = SettableFuture.create(); 2928 WeakReference<SettableFuture<Long>> future1Ref = new WeakReference<>(future1); 2929 WeakReference<SettableFuture<Long>> future2Ref = new WeakReference<>(future2); 2930 2931 Callable<Long> combiner = 2932 new Callable<Long>() { 2933 @Override 2934 public Long call() { 2935 throw new AssertionError(); 2936 } 2937 }; 2938 2939 ListenableFuture<Long> unused = 2940 whenAllSucceed(future1, future2).call(combiner, noOpScheduledExecutor()); 2941 2942 future1.set(1L); 2943 future1 = null; 2944 future2.set(2L); 2945 future2 = null; 2946 2947 /* 2948 * Futures should be collected even if combiner never runs. This is kind of a silly test, since 2949 * the combiner is almost certain to hold its own reference to the futures, and a real app would 2950 * hold a reference to the executor and thus to the combiner. What we really care about is that 2951 * the futures are released once the combiner is done running. But we happen to provide this 2952 * earlier cleanup at the moment, so we're testing it. 2953 */ 2954 GcFinalization.awaitClear(future1Ref); 2955 GcFinalization.awaitClear(future2Ref); 2956 } 2957 2958 @AndroidIncompatible 2959 @GwtIncompatible 2960 public void testWhenAllComplete_releasesInputFuturesUponCancellation() throws Exception { 2961 SettableFuture<Long> future = SettableFuture.create(); 2962 WeakReference<SettableFuture<Long>> futureRef = new WeakReference<>(future); 2963 2964 Callable<Long> combiner = 2965 new Callable<Long>() { 2966 @Override 2967 public Long call() { 2968 throw new AssertionError(); 2969 } 2970 }; 2971 2972 ListenableFuture<Long> unused = whenAllComplete(future).call(combiner, noOpScheduledExecutor()); 2973 2974 unused.cancel(false); 2975 future = null; 2976 2977 // Future should be collected because whenAll*Complete* doesn't need to look at its result. 2978 GcFinalization.awaitClear(futureRef); 2979 } 2980 2981 @AndroidIncompatible 2982 @GwtIncompatible 2983 public void testWhenAllSucceed_releasesCallable() throws Exception { 2984 AsyncCallable<Long> combiner = 2985 new AsyncCallable<Long>() { 2986 @Override 2987 public ListenableFuture<Long> call() { 2988 return SettableFuture.create(); 2989 } 2990 }; 2991 WeakReference<AsyncCallable<Long>> combinerRef = new WeakReference<>(combiner); 2992 2993 ListenableFuture<Long> unused = 2994 whenAllSucceed(immediateFuture(1L)).callAsync(combiner, directExecutor()); 2995 2996 combiner = null; 2997 // combiner should be collected even if the future it returns never completes. 2998 GcFinalization.awaitClear(combinerRef); 2999 } 3000 3001 /* 3002 * TODO(cpovirk): maybe pass around TestFuture instances instead of 3003 * ListenableFuture instances 3004 */ 3005 3006 /** 3007 * A future in {@link TestFutureBatch} that also has a name for debugging purposes and a {@code 3008 * finisher}, a task that will complete the future in some fashion when it is called, allowing for 3009 * testing both before and after the completion of the future. 3010 */ 3011 @GwtIncompatible // used only in GwtIncompatible tests 3012 private static final class TestFuture { 3013 3014 final ListenableFuture<String> future; 3015 final String name; 3016 final Runnable finisher; 3017 3018 TestFuture(ListenableFuture<String> future, String name, Runnable finisher) { 3019 this.future = future; 3020 this.name = name; 3021 this.finisher = finisher; 3022 } 3023 } 3024 3025 /** 3026 * A collection of several futures, covering cancellation, success, and failure (both {@link 3027 * ExecutionException} and {@link RuntimeException}), both immediate and delayed. We use each 3028 * possible pair of these futures in {@link FuturesTest#runExtensiveMergerTest}. 3029 * 3030 * <p>Each test requires a new {@link TestFutureBatch} because we need new delayed futures each 3031 * time, as the old delayed futures were completed as part of the old test. 3032 */ 3033 @GwtIncompatible // used only in GwtIncompatible tests 3034 private static final class TestFutureBatch { 3035 3036 final ListenableFuture<String> doneSuccess = immediateFuture("a"); 3037 final ListenableFuture<String> doneFailed = immediateFailedFuture(new Exception()); 3038 final SettableFuture<String> doneCancelled = SettableFuture.create(); 3039 3040 { 3041 doneCancelled.cancel(true); 3042 } 3043 3044 final ListenableFuture<String> doneRuntimeException = 3045 new ForwardingListenableFuture<String>() { 3046 final ListenableFuture<String> delegate = immediateFuture("Should never be seen"); 3047 3048 @Override 3049 protected ListenableFuture<String> delegate() { 3050 return delegate; 3051 } 3052 3053 @Override 3054 public String get() { 3055 throw new RuntimeException(); 3056 } 3057 3058 @Override 3059 public String get(long timeout, TimeUnit unit) { 3060 throw new RuntimeException(); 3061 } 3062 }; 3063 3064 final SettableFuture<String> delayedSuccess = SettableFuture.create(); 3065 final SettableFuture<String> delayedFailed = SettableFuture.create(); 3066 final SettableFuture<String> delayedCancelled = SettableFuture.create(); 3067 3068 final SettableFuture<String> delegateForDelayedRuntimeException = SettableFuture.create(); 3069 final ListenableFuture<String> delayedRuntimeException = 3070 new ForwardingListenableFuture<String>() { 3071 @Override 3072 protected ListenableFuture<String> delegate() { 3073 return delegateForDelayedRuntimeException; 3074 } 3075 3076 @Override 3077 public String get() throws ExecutionException, InterruptedException { 3078 delegateForDelayedRuntimeException.get(); 3079 throw new RuntimeException(); 3080 } 3081 3082 @Override 3083 public String get(long timeout, TimeUnit unit) 3084 throws ExecutionException, InterruptedException, TimeoutException { 3085 delegateForDelayedRuntimeException.get(timeout, unit); 3086 throw new RuntimeException(); 3087 } 3088 }; 3089 3090 final Runnable doNothing = 3091 new Runnable() { 3092 @Override 3093 public void run() {} 3094 }; 3095 final Runnable finishSuccess = 3096 new Runnable() { 3097 @Override 3098 public void run() { 3099 delayedSuccess.set("b"); 3100 } 3101 }; 3102 final Runnable finishFailure = 3103 new Runnable() { 3104 @Override 3105 public void run() { 3106 delayedFailed.setException(new Exception()); 3107 } 3108 }; 3109 final Runnable finishCancelled = 3110 new Runnable() { 3111 @Override 3112 public void run() { 3113 delayedCancelled.cancel(true); 3114 } 3115 }; 3116 final Runnable finishRuntimeException = 3117 new Runnable() { 3118 @Override 3119 public void run() { 3120 delegateForDelayedRuntimeException.set("Should never be seen"); 3121 } 3122 }; 3123 3124 /** All the futures, together with human-readable names for use by {@link #smartToString}. */ 3125 final ImmutableList<TestFuture> allFutures = 3126 ImmutableList.of( 3127 new TestFuture(doneSuccess, "doneSuccess", doNothing), 3128 new TestFuture(doneFailed, "doneFailed", doNothing), 3129 new TestFuture(doneCancelled, "doneCancelled", doNothing), 3130 new TestFuture(doneRuntimeException, "doneRuntimeException", doNothing), 3131 new TestFuture(delayedSuccess, "delayedSuccess", finishSuccess), 3132 new TestFuture(delayedFailed, "delayedFailed", finishFailure), 3133 new TestFuture(delayedCancelled, "delayedCancelled", finishCancelled), 3134 new TestFuture( 3135 delayedRuntimeException, "delayedRuntimeException", finishRuntimeException)); 3136 3137 final Function<ListenableFuture<String>, String> nameGetter = 3138 new Function<ListenableFuture<String>, String>() { 3139 @Override 3140 public String apply(ListenableFuture<String> input) { 3141 for (TestFuture future : allFutures) { 3142 if (future.future == input) { 3143 return future.name; 3144 } 3145 } 3146 throw new IllegalArgumentException(input.toString()); 3147 } 3148 }; 3149 3150 static boolean intersect(Set<?> a, Set<?> b) { 3151 return !intersection(a, b).isEmpty(); 3152 } 3153 3154 /** 3155 * Like {@code inputs.toString()}, but with the nonsense {@code toString} representations 3156 * replaced with the name of each future from {@link #allFutures}. 3157 */ 3158 String smartToString(ImmutableSet<ListenableFuture<String>> inputs) { 3159 Iterable<String> inputNames = Iterables.transform(inputs, nameGetter); 3160 return Joiner.on(", ").join(inputNames); 3161 } 3162 3163 void smartAssertTrue( 3164 ImmutableSet<ListenableFuture<String>> inputs, Exception cause, boolean expression) { 3165 if (!expression) { 3166 throw failureWithCause(cause, smartToString(inputs)); 3167 } 3168 } 3169 3170 boolean hasDelayed(ListenableFuture<String> a, ListenableFuture<String> b) { 3171 ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); 3172 return intersect( 3173 inputs, 3174 ImmutableSet.of( 3175 delayedSuccess, delayedFailed, delayedCancelled, delayedRuntimeException)); 3176 } 3177 3178 void assertHasDelayed(ListenableFuture<String> a, ListenableFuture<String> b, Exception e) { 3179 ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); 3180 smartAssertTrue(inputs, e, hasDelayed(a, b)); 3181 } 3182 3183 void assertHasFailure(ListenableFuture<String> a, ListenableFuture<String> b, Exception e) { 3184 ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); 3185 smartAssertTrue( 3186 inputs, 3187 e, 3188 intersect( 3189 inputs, 3190 ImmutableSet.of( 3191 doneFailed, doneRuntimeException, delayedFailed, delayedRuntimeException))); 3192 } 3193 3194 void assertHasCancel(ListenableFuture<String> a, ListenableFuture<String> b, Exception e) { 3195 ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); 3196 smartAssertTrue( 3197 inputs, e, intersect(inputs, ImmutableSet.of(doneCancelled, delayedCancelled))); 3198 } 3199 3200 void assertHasImmediateFailure( 3201 ListenableFuture<String> a, ListenableFuture<String> b, Exception e) { 3202 ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); 3203 smartAssertTrue( 3204 inputs, e, intersect(inputs, ImmutableSet.of(doneFailed, doneRuntimeException))); 3205 } 3206 3207 void assertHasImmediateCancel( 3208 ListenableFuture<String> a, ListenableFuture<String> b, Exception e) { 3209 ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); 3210 smartAssertTrue(inputs, e, intersect(inputs, ImmutableSet.of(doneCancelled))); 3211 } 3212 } 3213 3214 /** 3215 * {@link Futures#allAsList(Iterable)} or {@link Futures#successfulAsList(Iterable)}, hidden 3216 * behind a common interface for testing. 3217 */ 3218 @GwtIncompatible // used only in GwtIncompatible tests 3219 private interface Merger { 3220 3221 ListenableFuture<List<String>> merged(ListenableFuture<String> a, ListenableFuture<String> b); 3222 3223 Merger allMerger = 3224 new Merger() { 3225 @Override 3226 public ListenableFuture<List<String>> merged( 3227 ListenableFuture<String> a, ListenableFuture<String> b) { 3228 return allAsList(ImmutableSet.of(a, b)); 3229 } 3230 }; 3231 Merger successMerger = 3232 new Merger() { 3233 @Override 3234 public ListenableFuture<List<String>> merged( 3235 ListenableFuture<String> a, ListenableFuture<String> b) { 3236 return successfulAsList(ImmutableSet.of(a, b)); 3237 } 3238 }; 3239 } 3240 3241 /** 3242 * Very rough equivalent of a timed get, produced by calling the no-arg get method in another 3243 * thread and waiting a short time for it. 3244 * 3245 * <p>We need this to test the behavior of no-arg get methods without hanging the main test thread 3246 * forever in the case of failure. 3247 */ 3248 @CanIgnoreReturnValue 3249 @GwtIncompatible // threads 3250 static <V> V pseudoTimedGetUninterruptibly(final Future<V> input, long timeout, TimeUnit unit) 3251 throws ExecutionException, TimeoutException { 3252 ExecutorService executor = newSingleThreadExecutor(); 3253 Future<V> waiter = 3254 executor.submit( 3255 new Callable<V>() { 3256 @Override 3257 public V call() throws Exception { 3258 return input.get(); 3259 } 3260 }); 3261 3262 try { 3263 return getUninterruptibly(waiter, timeout, unit); 3264 } catch (ExecutionException e) { 3265 propagateIfInstanceOf(e.getCause(), ExecutionException.class); 3266 propagateIfInstanceOf(e.getCause(), CancellationException.class); 3267 throw failureWithCause(e, "Unexpected exception"); 3268 } finally { 3269 executor.shutdownNow(); 3270 // TODO(cpovirk): assertTrue(awaitTerminationUninterruptibly(executor, 10, SECONDS)); 3271 } 3272 } 3273 3274 /** 3275 * For each possible pair of futures from {@link TestFutureBatch}, for each possible completion 3276 * order of those futures, test that various get calls (timed before future completion, untimed 3277 * before future completion, and untimed after future completion) return or throw the proper 3278 * values. 3279 */ 3280 @GwtIncompatible // used only in GwtIncompatible tests 3281 private static void runExtensiveMergerTest(Merger merger) throws InterruptedException { 3282 int inputCount = new TestFutureBatch().allFutures.size(); 3283 3284 for (int i = 0; i < inputCount; i++) { 3285 for (int j = 0; j < inputCount; j++) { 3286 for (boolean iBeforeJ : new boolean[] {true, false}) { 3287 TestFutureBatch inputs = new TestFutureBatch(); 3288 ListenableFuture<String> iFuture = inputs.allFutures.get(i).future; 3289 ListenableFuture<String> jFuture = inputs.allFutures.get(j).future; 3290 ListenableFuture<List<String>> future = merger.merged(iFuture, jFuture); 3291 3292 // Test timed get before we've completed any delayed futures. 3293 try { 3294 List<String> result = future.get(0, MILLISECONDS); 3295 assertTrue("Got " + result, asList("a", null).containsAll(result)); 3296 } catch (CancellationException e) { 3297 assertTrue(merger == Merger.allMerger); 3298 inputs.assertHasImmediateCancel(iFuture, jFuture, e); 3299 } catch (ExecutionException e) { 3300 assertTrue(merger == Merger.allMerger); 3301 inputs.assertHasImmediateFailure(iFuture, jFuture, e); 3302 } catch (TimeoutException e) { 3303 inputs.assertHasDelayed(iFuture, jFuture, e); 3304 } 3305 3306 // Same tests with pseudoTimedGet. 3307 try { 3308 List<String> result = 3309 conditionalPseudoTimedGetUninterruptibly( 3310 inputs, iFuture, jFuture, future, 20, MILLISECONDS); 3311 assertTrue("Got " + result, asList("a", null).containsAll(result)); 3312 } catch (CancellationException e) { 3313 assertTrue(merger == Merger.allMerger); 3314 inputs.assertHasImmediateCancel(iFuture, jFuture, e); 3315 } catch (ExecutionException e) { 3316 assertTrue(merger == Merger.allMerger); 3317 inputs.assertHasImmediateFailure(iFuture, jFuture, e); 3318 } catch (TimeoutException e) { 3319 inputs.assertHasDelayed(iFuture, jFuture, e); 3320 } 3321 3322 // Finish the two futures in the currently specified order: 3323 inputs.allFutures.get(iBeforeJ ? i : j).finisher.run(); 3324 inputs.allFutures.get(iBeforeJ ? j : i).finisher.run(); 3325 3326 // Test untimed get now that we've completed any delayed futures. 3327 try { 3328 List<String> result = getDone(future); 3329 assertTrue("Got " + result, asList("a", "b", null).containsAll(result)); 3330 } catch (CancellationException e) { 3331 assertTrue(merger == Merger.allMerger); 3332 inputs.assertHasCancel(iFuture, jFuture, e); 3333 } catch (ExecutionException e) { 3334 assertTrue(merger == Merger.allMerger); 3335 inputs.assertHasFailure(iFuture, jFuture, e); 3336 } 3337 } 3338 } 3339 } 3340 } 3341 3342 /** 3343 * Call the non-timed {@link Future#get()} in a way that allows us to abort if it's expected to 3344 * hang forever. More precisely, if it's expected to return, we simply call it[*], but if it's 3345 * expected to hang (because one of the input futures that we know makes it up isn't done yet), 3346 * then we call it in a separate thread (using pseudoTimedGet). The result is that we wait as long 3347 * as necessary when the method is expected to return (at the cost of hanging forever if there is 3348 * a bug in the class under test) but that we time out fairly promptly when the method is expected 3349 * to hang (possibly too quickly, but too-quick failures should be very unlikely, given that we 3350 * used to bail after 20ms during the expected-successful tests, and there we saw a failure rate 3351 * of ~1/5000, meaning that the other thread's get() call nearly always completes within 20ms if 3352 * it's going to complete at all). 3353 * 3354 * <p>[*] To avoid hangs, I've disabled the in-thread calls. This makes the test take (very 3355 * roughly) 2.5s longer. (2.5s is also the maximum length of time we will wait for a timed get 3356 * that is expected to succeed; the fact that the numbers match is only a coincidence.) See the 3357 * comment below for how to restore the fast but hang-y version. 3358 */ 3359 @GwtIncompatible // used only in GwtIncompatible tests 3360 private static List<String> conditionalPseudoTimedGetUninterruptibly( 3361 TestFutureBatch inputs, 3362 ListenableFuture<String> iFuture, 3363 ListenableFuture<String> jFuture, 3364 ListenableFuture<List<String>> future, 3365 int timeout, 3366 TimeUnit unit) 3367 throws ExecutionException, TimeoutException { 3368 /* 3369 * For faster tests (that may hang indefinitely if the class under test has 3370 * a bug!), switch the second branch to call untimed future.get() instead of 3371 * pseudoTimedGet. 3372 */ 3373 return (inputs.hasDelayed(iFuture, jFuture)) 3374 ? pseudoTimedGetUninterruptibly(future, timeout, unit) 3375 : pseudoTimedGetUninterruptibly(future, 2500, MILLISECONDS); 3376 } 3377 3378 @GwtIncompatible // threads 3379 public void testAllAsList_extensive() throws InterruptedException { 3380 runExtensiveMergerTest(Merger.allMerger); 3381 } 3382 3383 @GwtIncompatible // threads 3384 public void testSuccessfulAsList_extensive() throws InterruptedException { 3385 runExtensiveMergerTest(Merger.successMerger); 3386 } 3387 3388 public void testSuccessfulAsList() throws Exception { 3389 // Create input and output 3390 SettableFuture<String> future1 = SettableFuture.create(); 3391 SettableFuture<String> future2 = SettableFuture.create(); 3392 SettableFuture<String> future3 = SettableFuture.create(); 3393 @SuppressWarnings("unchecked") // array is never modified 3394 ListenableFuture<List<String>> compound = successfulAsList(future1, future2, future3); 3395 3396 // Attach a listener 3397 SingleCallListener listener = new SingleCallListener(); 3398 compound.addListener(listener, directExecutor()); 3399 3400 // Satisfy each input and check the output 3401 assertFalse(compound.isDone()); 3402 future1.set(DATA1); 3403 assertFalse(compound.isDone()); 3404 future2.set(DATA2); 3405 assertFalse(compound.isDone()); 3406 listener.expectCall(); 3407 future3.set(DATA3); 3408 assertTrue(listener.wasCalled()); 3409 3410 List<String> results = getDone(compound); 3411 assertThat(results).containsExactly(DATA1, DATA2, DATA3).inOrder(); 3412 } 3413 3414 public void testSuccessfulAsList_emptyList() throws Exception { 3415 SingleCallListener listener = new SingleCallListener(); 3416 listener.expectCall(); 3417 List<ListenableFuture<String>> futures = ImmutableList.of(); 3418 ListenableFuture<List<String>> compound = successfulAsList(futures); 3419 compound.addListener(listener, directExecutor()); 3420 assertThat(getDone(compound)).isEmpty(); 3421 assertTrue(listener.wasCalled()); 3422 } 3423 3424 public void testSuccessfulAsList_emptyArray() throws Exception { 3425 SingleCallListener listener = new SingleCallListener(); 3426 listener.expectCall(); 3427 @SuppressWarnings("unchecked") // array is never modified 3428 ListenableFuture<List<String>> compound = successfulAsList(); 3429 compound.addListener(listener, directExecutor()); 3430 assertThat(getDone(compound)).isEmpty(); 3431 assertTrue(listener.wasCalled()); 3432 } 3433 3434 public void testSuccessfulAsList_partialFailure() throws Exception { 3435 SingleCallListener listener = new SingleCallListener(); 3436 SettableFuture<String> future1 = SettableFuture.create(); 3437 SettableFuture<String> future2 = SettableFuture.create(); 3438 @SuppressWarnings("unchecked") // array is never modified 3439 ListenableFuture<List<String>> compound = successfulAsList(future1, future2); 3440 compound.addListener(listener, directExecutor()); 3441 3442 assertFalse(compound.isDone()); 3443 future1.setException(new Throwable("failed1")); 3444 assertFalse(compound.isDone()); 3445 listener.expectCall(); 3446 future2.set(DATA2); 3447 assertTrue(listener.wasCalled()); 3448 3449 List<String> results = getDone(compound); 3450 assertThat(results).containsExactly(null, DATA2).inOrder(); 3451 } 3452 3453 public void testSuccessfulAsList_totalFailure() throws Exception { 3454 SingleCallListener listener = new SingleCallListener(); 3455 SettableFuture<String> future1 = SettableFuture.create(); 3456 SettableFuture<String> future2 = SettableFuture.create(); 3457 @SuppressWarnings("unchecked") // array is never modified 3458 ListenableFuture<List<String>> compound = successfulAsList(future1, future2); 3459 compound.addListener(listener, directExecutor()); 3460 3461 assertFalse(compound.isDone()); 3462 future1.setException(new Throwable("failed1")); 3463 assertFalse(compound.isDone()); 3464 listener.expectCall(); 3465 future2.setException(new Throwable("failed2")); 3466 assertTrue(listener.wasCalled()); 3467 3468 List<String> results = getDone(compound); 3469 assertThat(results).containsExactly(null, null).inOrder(); 3470 } 3471 3472 public void testSuccessfulAsList_cancelled() throws Exception { 3473 SingleCallListener listener = new SingleCallListener(); 3474 SettableFuture<String> future1 = SettableFuture.create(); 3475 SettableFuture<String> future2 = SettableFuture.create(); 3476 @SuppressWarnings("unchecked") // array is never modified 3477 ListenableFuture<List<String>> compound = successfulAsList(future1, future2); 3478 compound.addListener(listener, directExecutor()); 3479 3480 assertFalse(compound.isDone()); 3481 future1.cancel(true); 3482 assertFalse(compound.isDone()); 3483 listener.expectCall(); 3484 future2.set(DATA2); 3485 assertTrue(listener.wasCalled()); 3486 3487 List<String> results = getDone(compound); 3488 assertThat(results).containsExactly(null, DATA2).inOrder(); 3489 } 3490 3491 public void testSuccessfulAsList_resultCancelled() throws Exception { 3492 SettableFuture<String> future1 = SettableFuture.create(); 3493 SettableFuture<String> future2 = SettableFuture.create(); 3494 @SuppressWarnings("unchecked") // array is never modified 3495 ListenableFuture<List<String>> compound = successfulAsList(future1, future2); 3496 3497 future2.set(DATA2); 3498 assertFalse(compound.isDone()); 3499 assertTrue(compound.cancel(false)); 3500 assertTrue(compound.isCancelled()); 3501 assertTrue(future1.isCancelled()); 3502 assertFalse(future1.wasInterrupted()); 3503 } 3504 3505 public void testSuccessfulAsList_resultCancelledRacingInputDone() throws Exception { 3506 TestLogHandler listenerLoggerHandler = new TestLogHandler(); 3507 Logger exceptionLogger = Logger.getLogger(AbstractFuture.class.getName()); 3508 exceptionLogger.addHandler(listenerLoggerHandler); 3509 try { 3510 doTestSuccessfulAsList_resultCancelledRacingInputDone(); 3511 3512 assertWithMessage("Nothing should be logged") 3513 .that(listenerLoggerHandler.getStoredLogRecords()) 3514 .isEmpty(); 3515 } finally { 3516 exceptionLogger.removeHandler(listenerLoggerHandler); 3517 } 3518 } 3519 3520 private static void doTestSuccessfulAsList_resultCancelledRacingInputDone() throws Exception { 3521 // Simple (combined.cancel -> input.cancel -> setOneValue): 3522 successfulAsList(ImmutableList.of(SettableFuture.create())).cancel(true); 3523 3524 /* 3525 * Complex (combined.cancel -> input.cancel -> other.set -> setOneValue), 3526 * to show that this isn't just about problems with the input future we just 3527 * cancelled: 3528 */ 3529 final SettableFuture<String> future1 = SettableFuture.create(); 3530 final SettableFuture<String> future2 = SettableFuture.create(); 3531 @SuppressWarnings("unchecked") // array is never modified 3532 ListenableFuture<List<String>> compound = successfulAsList(future1, future2); 3533 3534 future1.addListener( 3535 new Runnable() { 3536 @Override 3537 public void run() { 3538 assertTrue(future1.isCancelled()); 3539 /* 3540 * This test relies on behavior that's unspecified but currently 3541 * guaranteed by the implementation: Cancellation of inputs is 3542 * performed in the order they were provided to the constructor. Verify 3543 * that as a sanity check: 3544 */ 3545 assertFalse(future2.isCancelled()); 3546 // Now attempt to trigger the exception: 3547 future2.set(DATA2); 3548 } 3549 }, 3550 directExecutor()); 3551 assertTrue(compound.cancel(false)); 3552 assertTrue(compound.isCancelled()); 3553 assertTrue(future1.isCancelled()); 3554 assertFalse(future2.isCancelled()); 3555 3556 try { 3557 getDone(compound); 3558 fail(); 3559 } catch (CancellationException expected) { 3560 } 3561 } 3562 3563 public void testSuccessfulAsList_resultInterrupted() throws Exception { 3564 SettableFuture<String> future1 = SettableFuture.create(); 3565 SettableFuture<String> future2 = SettableFuture.create(); 3566 @SuppressWarnings("unchecked") // array is never modified 3567 ListenableFuture<List<String>> compound = successfulAsList(future1, future2); 3568 3569 future2.set(DATA2); 3570 assertFalse(compound.isDone()); 3571 assertTrue(compound.cancel(true)); 3572 assertTrue(compound.isCancelled()); 3573 assertTrue(future1.isCancelled()); 3574 assertTrue(future1.wasInterrupted()); 3575 } 3576 3577 public void testSuccessfulAsList_mixed() throws Exception { 3578 SingleCallListener listener = new SingleCallListener(); 3579 SettableFuture<String> future1 = SettableFuture.create(); 3580 SettableFuture<String> future2 = SettableFuture.create(); 3581 SettableFuture<String> future3 = SettableFuture.create(); 3582 @SuppressWarnings("unchecked") // array is never modified 3583 ListenableFuture<List<String>> compound = successfulAsList(future1, future2, future3); 3584 compound.addListener(listener, directExecutor()); 3585 3586 // First is cancelled, second fails, third succeeds 3587 assertFalse(compound.isDone()); 3588 future1.cancel(true); 3589 assertFalse(compound.isDone()); 3590 future2.setException(new Throwable("failed2")); 3591 assertFalse(compound.isDone()); 3592 listener.expectCall(); 3593 future3.set(DATA3); 3594 assertTrue(listener.wasCalled()); 3595 3596 List<String> results = getDone(compound); 3597 assertThat(results).containsExactly(null, null, DATA3).inOrder(); 3598 } 3599 3600 /** Non-Error exceptions are never logged. */ 3601 @SuppressWarnings("unchecked") 3602 public void testSuccessfulAsList_logging_exception() throws Exception { 3603 assertEquals( 3604 newArrayList((Object) null), 3605 getDone(successfulAsList(immediateFailedFuture(new MyException())))); 3606 assertWithMessage("Nothing should be logged") 3607 .that(aggregateFutureLogHandler.getStoredLogRecords()) 3608 .isEmpty(); 3609 3610 // Not even if there are a bunch of failures. 3611 assertEquals( 3612 newArrayList(null, null, null), 3613 getDone( 3614 successfulAsList( 3615 immediateFailedFuture(new MyException()), 3616 immediateFailedFuture(new MyException()), 3617 immediateFailedFuture(new MyException())))); 3618 assertWithMessage("Nothing should be logged") 3619 .that(aggregateFutureLogHandler.getStoredLogRecords()) 3620 .isEmpty(); 3621 } 3622 3623 /** Ensure that errors are always logged. */ 3624 @SuppressWarnings("unchecked") 3625 public void testSuccessfulAsList_logging_error() throws Exception { 3626 assertEquals( 3627 newArrayList((Object) null), 3628 getDone(successfulAsList(immediateFailedFuture(new MyError())))); 3629 List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords(); 3630 assertThat(logged).hasSize(1); // errors are always logged 3631 assertThat(logged.get(0).getThrown()).isInstanceOf(MyError.class); 3632 } 3633 3634 public void testSuccessfulAsList_failureLoggedEvenAfterOutputCancelled() throws Exception { 3635 ListenableFuture<String> input = new CancelPanickingFuture<>(); 3636 ListenableFuture<List<String>> output = successfulAsList(input); 3637 output.cancel(false); 3638 3639 List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords(); 3640 assertThat(logged).hasSize(1); 3641 assertThat(logged.get(0).getThrown()).hasMessageThat().isEqualTo("You can't fire me, I quit."); 3642 } 3643 3644 private static final class CancelPanickingFuture<V> extends AbstractFuture<V> { 3645 @Override 3646 public boolean cancel(boolean mayInterruptIfRunning) { 3647 setException(new Error("You can't fire me, I quit.")); 3648 return false; 3649 } 3650 } 3651 3652 public void testNonCancellationPropagating_successful() throws Exception { 3653 SettableFuture<Foo> input = SettableFuture.create(); 3654 ListenableFuture<Foo> wrapper = nonCancellationPropagating(input); 3655 Foo foo = new Foo(); 3656 3657 assertFalse(wrapper.isDone()); 3658 input.set(foo); 3659 assertTrue(wrapper.isDone()); 3660 assertSame(foo, getDone(wrapper)); 3661 } 3662 3663 public void testNonCancellationPropagating_failure() throws Exception { 3664 SettableFuture<Foo> input = SettableFuture.create(); 3665 ListenableFuture<Foo> wrapper = nonCancellationPropagating(input); 3666 Throwable failure = new Throwable("thrown"); 3667 3668 assertFalse(wrapper.isDone()); 3669 input.setException(failure); 3670 try { 3671 getDone(wrapper); 3672 fail(); 3673 } catch (ExecutionException expected) { 3674 assertSame(failure, expected.getCause()); 3675 } 3676 } 3677 3678 public void testNonCancellationPropagating_delegateCancelled() throws Exception { 3679 SettableFuture<Foo> input = SettableFuture.create(); 3680 ListenableFuture<Foo> wrapper = nonCancellationPropagating(input); 3681 3682 assertFalse(wrapper.isDone()); 3683 assertTrue(input.cancel(false)); 3684 assertTrue(wrapper.isCancelled()); 3685 } 3686 3687 public void testNonCancellationPropagating_doesNotPropagate() throws Exception { 3688 SettableFuture<Foo> input = SettableFuture.create(); 3689 ListenableFuture<Foo> wrapper = nonCancellationPropagating(input); 3690 3691 assertTrue(wrapper.cancel(true)); 3692 assertTrue(wrapper.isCancelled()); 3693 assertTrue(wrapper.isDone()); 3694 assertFalse(input.isCancelled()); 3695 assertFalse(input.isDone()); 3696 } 3697 3698 @GwtIncompatible // used only in GwtIncompatible tests 3699 private static class TestException extends Exception { 3700 3701 TestException(@Nullable Throwable cause) { 3702 super(cause); 3703 } 3704 } 3705 3706 @GwtIncompatible // used only in GwtIncompatible tests 3707 private interface MapperFunction extends Function<Throwable, Exception> {} 3708 3709 public void testCompletionOrder() throws Exception { 3710 SettableFuture<Long> future1 = SettableFuture.create(); 3711 SettableFuture<Long> future2 = SettableFuture.create(); 3712 SettableFuture<Long> future3 = SettableFuture.create(); 3713 SettableFuture<Long> future4 = SettableFuture.create(); 3714 SettableFuture<Long> future5 = SettableFuture.create(); 3715 3716 ImmutableList<ListenableFuture<Long>> futures = 3717 inCompletionOrder( 3718 ImmutableList.<ListenableFuture<Long>>of(future1, future2, future3, future4, future5)); 3719 future2.set(1L); 3720 future5.set(2L); 3721 future1.set(3L); 3722 future3.set(4L); 3723 future4.set(5L); 3724 3725 long expectedResult = 1L; 3726 for (ListenableFuture<Long> future : futures) { 3727 assertEquals((Long) expectedResult, getDone(future)); 3728 expectedResult++; 3729 } 3730 } 3731 3732 public void testCompletionOrderExceptionThrown() throws Exception { 3733 SettableFuture<Long> future1 = SettableFuture.create(); 3734 SettableFuture<Long> future2 = SettableFuture.create(); 3735 SettableFuture<Long> future3 = SettableFuture.create(); 3736 SettableFuture<Long> future4 = SettableFuture.create(); 3737 SettableFuture<Long> future5 = SettableFuture.create(); 3738 3739 ImmutableList<ListenableFuture<Long>> futures = 3740 inCompletionOrder( 3741 ImmutableList.<ListenableFuture<Long>>of(future1, future2, future3, future4, future5)); 3742 future2.set(1L); 3743 future5.setException(new IllegalStateException("2L")); 3744 future1.set(3L); 3745 future3.set(4L); 3746 future4.set(5L); 3747 3748 long expectedResult = 1L; 3749 for (ListenableFuture<Long> future : futures) { 3750 if (expectedResult != 2) { 3751 assertEquals((Long) expectedResult, getDone(future)); 3752 } else { 3753 try { 3754 getDone(future); 3755 fail(); 3756 } catch (ExecutionException expected) { 3757 assertThat(expected).hasCauseThat().hasMessageThat().isEqualTo("2L"); 3758 } 3759 } 3760 expectedResult++; 3761 } 3762 } 3763 3764 public void testCompletionOrderFutureCancelled() throws Exception { 3765 SettableFuture<Long> future1 = SettableFuture.create(); 3766 SettableFuture<Long> future2 = SettableFuture.create(); 3767 SettableFuture<Long> future3 = SettableFuture.create(); 3768 SettableFuture<Long> future4 = SettableFuture.create(); 3769 SettableFuture<Long> future5 = SettableFuture.create(); 3770 3771 ImmutableList<ListenableFuture<Long>> futures = 3772 inCompletionOrder( 3773 ImmutableList.<ListenableFuture<Long>>of(future1, future2, future3, future4, future5)); 3774 future2.set(1L); 3775 future5.set(2L); 3776 future1.set(3L); 3777 future3.cancel(true); 3778 future4.set(5L); 3779 3780 long expectedResult = 1L; 3781 for (ListenableFuture<Long> future : futures) { 3782 if (expectedResult != 4) { 3783 assertEquals((Long) expectedResult, getDone(future)); 3784 } else { 3785 try { 3786 getDone(future); 3787 fail(); 3788 } catch (CancellationException expected) { 3789 } 3790 } 3791 expectedResult++; 3792 } 3793 } 3794 3795 public void testCompletionOrderFutureInterruption() throws Exception { 3796 SettableFuture<Long> future1 = SettableFuture.create(); 3797 SettableFuture<Long> future2 = SettableFuture.create(); 3798 SettableFuture<Long> future3 = SettableFuture.create(); 3799 3800 ImmutableList<ListenableFuture<Long>> futures = 3801 inCompletionOrder(ImmutableList.<ListenableFuture<Long>>of(future1, future2, future3)); 3802 future2.set(1L); 3803 3804 futures.get(1).cancel(true); 3805 futures.get(2).cancel(false); 3806 3807 assertTrue(future1.isCancelled()); 3808 assertFalse(future1.wasInterrupted()); 3809 assertTrue(future3.isCancelled()); 3810 assertFalse(future3.wasInterrupted()); 3811 } 3812 3813 public void testCancellingADelegatePropagates() throws Exception { 3814 SettableFuture<Long> future1 = SettableFuture.create(); 3815 SettableFuture<Long> future2 = SettableFuture.create(); 3816 SettableFuture<Long> future3 = SettableFuture.create(); 3817 3818 ImmutableList<ListenableFuture<Long>> delegates = 3819 inCompletionOrder(ImmutableList.<ListenableFuture<Long>>of(future1, future2, future3)); 3820 3821 future1.set(1L); 3822 // Cannot cancel a complete delegate 3823 assertFalse(delegates.get(0).cancel(true)); 3824 // Cancel the delegate before the input future is done 3825 assertTrue(delegates.get(1).cancel(true)); 3826 // Setting the future still works since cancellation didn't propagate 3827 assertTrue(future2.set(2L)); 3828 // Second check to ensure the input future was not cancelled 3829 assertEquals((Long) 2L, getDone(future2)); 3830 3831 // All futures are now complete; outstanding inputs are cancelled 3832 assertTrue(future3.isCancelled()); 3833 assertTrue(future3.wasInterrupted()); 3834 } 3835 3836 @AndroidIncompatible // runs out of memory under some versions of the emulator 3837 public void testCancellingAllDelegatesIsNotQuadratic() throws Exception { 3838 ImmutableList.Builder<SettableFuture<Long>> builder = ImmutableList.builder(); 3839 for (int i = 0; i < 500_000; i++) { 3840 builder.add(SettableFuture.<Long>create()); 3841 } 3842 ImmutableList<SettableFuture<Long>> inputs = builder.build(); 3843 ImmutableList<ListenableFuture<Long>> delegates = inCompletionOrder(inputs); 3844 3845 for (ListenableFuture<?> delegate : delegates) { 3846 delegate.cancel(true); 3847 } 3848 3849 for (ListenableFuture<?> input : inputs) { 3850 assertTrue(input.isDone()); 3851 } 3852 } 3853 3854 @AndroidIncompatible // reference is never cleared under some versions of the emulator 3855 @GwtIncompatible 3856 public void testInputGCedIfUnreferenced() throws Exception { 3857 SettableFuture<Long> future1 = SettableFuture.create(); 3858 SettableFuture<Long> future2 = SettableFuture.create(); 3859 WeakReference<SettableFuture<Long>> future1Ref = new WeakReference<>(future1); 3860 WeakReference<SettableFuture<Long>> future2Ref = new WeakReference<>(future2); 3861 3862 ImmutableList<ListenableFuture<Long>> delegates = 3863 inCompletionOrder(ImmutableList.<ListenableFuture<Long>>of(future1, future2)); 3864 3865 future1.set(1L); 3866 3867 future1 = null; 3868 // First future is complete, should be unreferenced 3869 GcFinalization.awaitClear(future1Ref); 3870 ListenableFuture<Long> outputFuture1 = delegates.get(0); 3871 delegates = null; 3872 future2 = null; 3873 // No references to list or other output future, second future should be unreferenced 3874 GcFinalization.awaitClear(future2Ref); 3875 outputFuture1.get(); 3876 } 3877 3878 // Mostly an example of how it would look like to use a list of mixed types 3879 public void testCompletionOrderMixedBagOTypes() throws Exception { 3880 SettableFuture<Long> future1 = SettableFuture.create(); 3881 SettableFuture<String> future2 = SettableFuture.create(); 3882 SettableFuture<Integer> future3 = SettableFuture.create(); 3883 3884 ImmutableList<? extends ListenableFuture<?>> inputs = 3885 ImmutableList.<ListenableFuture<?>>of(future1, future2, future3); 3886 ImmutableList<ListenableFuture<Object>> futures = inCompletionOrder(inputs); 3887 future2.set("1L"); 3888 future1.set(2L); 3889 future3.set(3); 3890 3891 ImmutableList<?> expected = ImmutableList.of("1L", 2L, 3); 3892 for (int i = 0; i < expected.size(); i++) { 3893 assertEquals(expected.get(i), getDone(futures.get(i))); 3894 } 3895 } 3896 3897 @GwtIncompatible // ClassSanityTester 3898 public void testFutures_nullChecks() throws Exception { 3899 new ClassSanityTester() 3900 .forAllPublicStaticMethods(Futures.class) 3901 .thatReturn(Future.class) 3902 .testNulls(); 3903 } 3904 3905 static AssertionFailedError failureWithCause(Throwable cause, String message) { 3906 AssertionFailedError failure = new AssertionFailedError(message); 3907 failure.initCause(cause); 3908 return failure; 3909 } 3910 3911 // This test covers a bug where an Error thrown from a callback could cause the TimeoutFuture to 3912 // never complete when timing out. Notably, nothing would get logged since the Error would get 3913 // stuck in the ScheduledFuture inside of TimeoutFuture and nothing ever calls get on it. 3914 3915 // Simulate a timeout that fires before the call the SES.schedule returns but the future is 3916 // already completed. 3917 3918 // This test covers a bug where an Error thrown from a callback could cause the TimeoutFuture to 3919 // never complete when timing out. Notably, nothing would get logged since the Error would get 3920 // stuck in the ScheduledFuture inside of TimeoutFuture and nothing ever calls get on it. 3921 3922 private static final Executor REJECTING_EXECUTOR = 3923 new Executor() { 3924 @Override 3925 public void execute(Runnable runnable) { 3926 throw new RejectedExecutionException(); 3927 } 3928 }; 3929 3930 private static <V> AsyncFunction<V, V> asyncIdentity() { 3931 return new AsyncFunction<V, V>() { 3932 @Override 3933 public ListenableFuture<V> apply(V input) { 3934 return immediateFuture(input); 3935 } 3936 }; 3937 } 3938 3939 private static <I, O> AsyncFunction<I, O> tagged( 3940 final String toString, final AsyncFunction<I, O> function) { 3941 return new AsyncFunction<I, O>() { 3942 @Override 3943 public ListenableFuture<O> apply(I input) throws Exception { 3944 return function.apply(input); 3945 } 3946 3947 @Override 3948 public String toString() { 3949 return toString; 3950 } 3951 }; 3952 } 3953 3954 private static <V> AsyncCallable<V> tagged( 3955 final String toString, final AsyncCallable<V> callable) { 3956 return new AsyncCallable<V>() { 3957 @Override 3958 public ListenableFuture<V> call() throws Exception { 3959 return callable.call(); 3960 } 3961 3962 @Override 3963 public String toString() { 3964 return toString; 3965 } 3966 }; 3967 } 3968 } 3969