1 /* 2 * Copyright (C) 2008 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.base.Functions.constant; 20 import static com.google.common.base.Functions.identity; 21 import static com.google.common.base.Throwables.propagateIfInstanceOf; 22 import static com.google.common.collect.Iterables.getOnlyElement; 23 import static com.google.common.collect.Lists.newArrayList; 24 import static com.google.common.collect.Sets.intersection; 25 import static com.google.common.truth.Truth.assertThat; 26 import static com.google.common.truth.Truth.assertWithMessage; 27 import static com.google.common.util.concurrent.Futures.allAsList; 28 import static com.google.common.util.concurrent.Futures.catching; 29 import static com.google.common.util.concurrent.Futures.catchingAsync; 30 import static com.google.common.util.concurrent.Futures.getDone; 31 import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; 32 import static com.google.common.util.concurrent.Futures.immediateFailedFuture; 33 import static com.google.common.util.concurrent.Futures.immediateFuture; 34 import static com.google.common.util.concurrent.Futures.immediateVoidFuture; 35 import static com.google.common.util.concurrent.Futures.inCompletionOrder; 36 import static com.google.common.util.concurrent.Futures.lazyTransform; 37 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; 38 import static com.google.common.util.concurrent.Futures.scheduleAsync; 39 import static com.google.common.util.concurrent.Futures.submit; 40 import static com.google.common.util.concurrent.Futures.submitAsync; 41 import static com.google.common.util.concurrent.Futures.successfulAsList; 42 import static com.google.common.util.concurrent.Futures.transform; 43 import static com.google.common.util.concurrent.Futures.transformAsync; 44 import static com.google.common.util.concurrent.Futures.whenAllComplete; 45 import static com.google.common.util.concurrent.Futures.whenAllSucceed; 46 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 47 import static com.google.common.util.concurrent.TestPlatform.clearInterrupt; 48 import static com.google.common.util.concurrent.TestPlatform.getDoneFromTimeoutOverload; 49 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; 50 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 51 import static com.google.common.util.concurrent.testing.TestingExecutors.noOpScheduledExecutor; 52 import static java.util.Arrays.asList; 53 import static java.util.concurrent.Executors.newSingleThreadExecutor; 54 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; 55 import static java.util.concurrent.TimeUnit.MILLISECONDS; 56 import static java.util.concurrent.TimeUnit.NANOSECONDS; 57 import static java.util.concurrent.TimeUnit.SECONDS; 58 59 import com.google.common.annotations.GwtCompatible; 60 import com.google.common.annotations.GwtIncompatible; 61 import com.google.common.base.Function; 62 import com.google.common.base.Joiner; 63 import com.google.common.base.Predicate; 64 import com.google.common.collect.ImmutableList; 65 import com.google.common.collect.ImmutableSet; 66 import com.google.common.collect.Iterables; 67 import com.google.common.testing.ClassSanityTester; 68 import com.google.common.testing.GcFinalization; 69 import com.google.common.testing.TestLogHandler; 70 import com.google.errorprone.annotations.CanIgnoreReturnValue; 71 import java.io.FileNotFoundException; 72 import java.io.IOException; 73 import java.lang.ref.WeakReference; 74 import java.util.List; 75 import java.util.Set; 76 import java.util.concurrent.Callable; 77 import java.util.concurrent.CancellationException; 78 import java.util.concurrent.CountDownLatch; 79 import java.util.concurrent.ExecutionException; 80 import java.util.concurrent.Executor; 81 import java.util.concurrent.ExecutorService; 82 import java.util.concurrent.Executors; 83 import java.util.concurrent.Future; 84 import java.util.concurrent.RejectedExecutionException; 85 import java.util.concurrent.ScheduledExecutorService; 86 import java.util.concurrent.TimeUnit; 87 import java.util.concurrent.TimeoutException; 88 import java.util.concurrent.atomic.AtomicBoolean; 89 import java.util.logging.LogRecord; 90 import java.util.logging.Logger; 91 import junit.framework.AssertionFailedError; 92 import junit.framework.TestCase; 93 import org.checkerframework.checker.nullness.qual.Nullable; 94 95 /** 96 * Unit tests for {@link Futures}. 97 * 98 * @author Nishant Thakkar 99 */ 100 @GwtCompatible(emulated = true) 101 public class FuturesTest extends TestCase { 102 private static final Logger aggregateFutureLogger = 103 Logger.getLogger(AggregateFuture.class.getName()); 104 private final TestLogHandler aggregateFutureLogHandler = new TestLogHandler(); 105 106 private static final String DATA1 = "data"; 107 private static final String DATA2 = "more data"; 108 private static final String DATA3 = "most data"; 109 110 @Override setUp()111 public void setUp() throws Exception { 112 super.setUp(); 113 aggregateFutureLogger.addHandler(aggregateFutureLogHandler); 114 } 115 116 @Override tearDown()117 public void tearDown() throws Exception { 118 /* 119 * Clear interrupt for future tests. 120 * 121 * (Ideally we would perform interrupts only in threads that we create, but 122 * it's hard to imagine that anything will break in practice.) 123 */ 124 clearInterrupt(); 125 aggregateFutureLogger.removeHandler(aggregateFutureLogHandler); 126 super.tearDown(); 127 } 128 129 /* 130 * TODO(cpovirk): Use FutureSubject once it's part of core Truth. But be wary of using it when I'm 131 * really testing a Future implementation (e.g., in the case of immediate*Future()). But it's OK 132 * to use in the case of the majority of Futures that are AbstractFutures. 133 */ 134 testImmediateFuture()135 public void testImmediateFuture() throws Exception { 136 ListenableFuture<String> future = immediateFuture(DATA1); 137 138 assertSame(DATA1, getDone(future)); 139 assertSame(DATA1, getDoneFromTimeoutOverload(future)); 140 assertThat(future.toString()).contains("[status=SUCCESS, result=[" + DATA1 + "]]"); 141 } 142 testImmediateVoidFuture()143 public void testImmediateVoidFuture() throws Exception { 144 ListenableFuture<@Nullable Void> voidFuture = immediateVoidFuture(); 145 146 assertThat(getDone(voidFuture)).isNull(); 147 assertThat(getDoneFromTimeoutOverload(voidFuture)).isNull(); 148 assertThat(voidFuture.toString()).contains("[status=SUCCESS, result=[null]]"); 149 } 150 testImmediateFailedFuture()151 public void testImmediateFailedFuture() throws Exception { 152 Exception exception = new Exception(); 153 ListenableFuture<String> future = immediateFailedFuture(exception); 154 assertThat(future.toString()).endsWith("[status=FAILURE, cause=[" + exception + "]]"); 155 156 try { 157 getDone(future); 158 fail(); 159 } catch (ExecutionException expected) { 160 assertSame(exception, expected.getCause()); 161 } 162 163 try { 164 getDoneFromTimeoutOverload(future); 165 fail(); 166 } catch (ExecutionException expected) { 167 assertSame(exception, expected.getCause()); 168 } 169 } 170 testImmediateFailedFuture_cancellationException()171 public void testImmediateFailedFuture_cancellationException() throws Exception { 172 CancellationException exception = new CancellationException(); 173 ListenableFuture<String> future = immediateFailedFuture(exception); 174 assertFalse(future.isCancelled()); 175 assertThat(future.toString()).endsWith("[status=FAILURE, cause=[" + exception + "]]"); 176 177 try { 178 getDone(future); 179 fail(); 180 } catch (ExecutionException expected) { 181 assertSame(exception, expected.getCause()); 182 } 183 184 try { 185 getDoneFromTimeoutOverload(future); 186 fail(); 187 } catch (ExecutionException expected) { 188 assertSame(exception, expected.getCause()); 189 } 190 } 191 testImmediateCancelledFutureBasic()192 public void testImmediateCancelledFutureBasic() throws Exception { 193 ListenableFuture<String> future = CallerClass1.makeImmediateCancelledFuture(); 194 assertTrue(future.isCancelled()); 195 } 196 197 @GwtIncompatible testImmediateCancelledFutureStack()198 public void testImmediateCancelledFutureStack() throws Exception { 199 ListenableFuture<String> future = CallerClass1.makeImmediateCancelledFuture(); 200 assertTrue(future.isCancelled()); 201 202 try { 203 CallerClass2.get(future); 204 fail(); 205 } catch (CancellationException expected) { 206 // There should be two CancellationException chained together. The outer one should have the 207 // stack trace of where the get() call was made, and the inner should have the stack trace of 208 // where the immediateCancelledFuture() call was made. 209 List<StackTraceElement> stackTrace = ImmutableList.copyOf(expected.getStackTrace()); 210 assertFalse(Iterables.any(stackTrace, hasClassName(CallerClass1.class))); 211 assertTrue(Iterables.any(stackTrace, hasClassName(CallerClass2.class))); 212 213 // See AbstractFutureCancellationCauseTest for how to set causes. 214 assertThat(expected.getCause()).isNull(); 215 } 216 } 217 218 @GwtIncompatible // used only in GwtIncompatible tests hasClassName(final Class<?> clazz)219 private static Predicate<StackTraceElement> hasClassName(final Class<?> clazz) { 220 return new Predicate<StackTraceElement>() { 221 @Override 222 public boolean apply(StackTraceElement element) { 223 return element.getClassName().equals(clazz.getName()); 224 } 225 }; 226 } 227 228 private static final class CallerClass1 { 229 static ListenableFuture<String> makeImmediateCancelledFuture() { 230 return immediateCancelledFuture(); 231 } 232 } 233 234 private static final class CallerClass2 { 235 @CanIgnoreReturnValue 236 static <V> V get(ListenableFuture<V> future) throws ExecutionException, InterruptedException { 237 return getDone(future); 238 } 239 } 240 241 private static class MyException extends Exception {} 242 243 // Class hierarchy for generics sanity checks 244 private static class Foo {} 245 246 private static class FooChild extends Foo {} 247 248 private static class Bar {} 249 250 private static class BarChild extends Bar {} 251 252 public void testTransform_genericsNull() throws Exception { 253 ListenableFuture<?> nullFuture = immediateFuture(null); 254 ListenableFuture<?> transformedFuture = transform(nullFuture, constant(null), directExecutor()); 255 assertNull(getDone(transformedFuture)); 256 } 257 258 public void testTransform_genericsHierarchy() throws Exception { 259 ListenableFuture<FooChild> future = immediateFuture(null); 260 final BarChild barChild = new BarChild(); 261 Function<Foo, BarChild> function = 262 new Function<Foo, BarChild>() { 263 @Override 264 public BarChild apply(Foo unused) { 265 return barChild; 266 } 267 }; 268 Bar bar = getDone(transform(future, function, directExecutor())); 269 assertSame(barChild, bar); 270 } 271 272 /* 273 * Android does not handle this stack overflow gracefully... though somehow some other 274 * stack-overflow tests work. It must depend on the exact place the error occurs. 275 */ 276 @AndroidIncompatible 277 @GwtIncompatible // StackOverflowError 278 public void testTransform_StackOverflow() throws Exception { 279 { 280 /* 281 * Initialize all relevant classes before running the test, which may otherwise poison any 282 * classes it is trying to load during its stack overflow. 283 */ 284 SettableFuture<Object> root = SettableFuture.create(); 285 ListenableFuture<Object> unused = transform(root, identity(), directExecutor()); 286 root.set("foo"); 287 } 288 289 SettableFuture<Object> root = SettableFuture.create(); 290 ListenableFuture<Object> output = root; 291 for (int i = 0; i < 10000; i++) { 292 output = transform(output, identity(), directExecutor()); 293 } 294 try { 295 root.set("foo"); 296 fail(); 297 } catch (StackOverflowError expected) { 298 } 299 } 300 301 public void testTransform_ErrorAfterCancellation() throws Exception { 302 class Transformer implements Function<Object, Object> { 303 ListenableFuture<Object> output; 304 305 @Override 306 public Object apply(Object input) { 307 output.cancel(false); 308 throw new MyError(); 309 } 310 } 311 Transformer transformer = new Transformer(); 312 SettableFuture<Object> input = SettableFuture.create(); 313 314 ListenableFuture<Object> output = transform(input, transformer, directExecutor()); 315 transformer.output = output; 316 317 input.set("foo"); 318 assertTrue(output.isCancelled()); 319 } 320 321 public void testTransform_ExceptionAfterCancellation() throws Exception { 322 class Transformer implements Function<Object, Object> { 323 ListenableFuture<Object> output; 324 325 @Override 326 public Object apply(Object input) { 327 output.cancel(false); 328 throw new MyRuntimeException(); 329 } 330 } 331 Transformer transformer = new Transformer(); 332 SettableFuture<Object> input = SettableFuture.create(); 333 334 ListenableFuture<Object> output = transform(input, transformer, directExecutor()); 335 transformer.output = output; 336 337 input.set("foo"); 338 assertTrue(output.isCancelled()); 339 } 340 341 public void testTransform_getThrowsRuntimeException() throws Exception { 342 ListenableFuture<Object> input = 343 UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); 344 345 ListenableFuture<Object> output = transform(input, identity(), directExecutor()); 346 try { 347 getDone(output); 348 fail(); 349 } catch (ExecutionException expected) { 350 assertThat(expected.getCause()).isInstanceOf(MyRuntimeException.class); 351 } 352 } 353 354 public void testTransform_getThrowsError() throws Exception { 355 ListenableFuture<Object> input = UncheckedThrowingFuture.throwingError(new MyError()); 356 357 ListenableFuture<Object> output = transform(input, identity(), directExecutor()); 358 try { 359 getDone(output); 360 fail(); 361 } catch (ExecutionException expected) { 362 assertThat(expected.getCause()).isInstanceOf(MyError.class); 363 } 364 } 365 366 public void testTransform_listenerThrowsError() throws Exception { 367 SettableFuture<Object> input = SettableFuture.create(); 368 ListenableFuture<Object> output = transform(input, identity(), directExecutor()); 369 370 output.addListener( 371 new Runnable() { 372 @Override 373 public void run() { 374 throw new MyError(); 375 } 376 }, 377 directExecutor()); 378 try { 379 input.set("foo"); 380 fail(); 381 } catch (MyError expected) { 382 } 383 } 384 385 public void testTransformAsync_cancelPropagatesToInput() throws Exception { 386 SettableFuture<Foo> input = SettableFuture.create(); 387 AsyncFunction<Foo, Bar> function = 388 new AsyncFunction<Foo, Bar>() { 389 @Override 390 public ListenableFuture<Bar> apply(Foo unused) { 391 throw new AssertionFailedError("Unexpected call to apply."); 392 } 393 }; 394 assertTrue(transformAsync(input, function, directExecutor()).cancel(false)); 395 assertTrue(input.isCancelled()); 396 assertFalse(input.wasInterrupted()); 397 } 398 399 public void testTransformAsync_interruptPropagatesToInput() throws Exception { 400 SettableFuture<Foo> input = SettableFuture.create(); 401 AsyncFunction<Foo, Bar> function = 402 new AsyncFunction<Foo, Bar>() { 403 @Override 404 public ListenableFuture<Bar> apply(Foo unused) { 405 throw new AssertionFailedError("Unexpected call to apply."); 406 } 407 }; 408 assertTrue(transformAsync(input, function, directExecutor()).cancel(true)); 409 assertTrue(input.isCancelled()); 410 assertTrue(input.wasInterrupted()); 411 } 412 413 @GwtIncompatible // threads 414 public void testTransformAsync_interruptPropagatesToTransformingThread() throws Exception { 415 SettableFuture<String> input = SettableFuture.create(); 416 final CountDownLatch inFunction = new CountDownLatch(1); 417 final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); 418 final CountDownLatch gotException = new CountDownLatch(1); 419 AsyncFunction<String, String> function = 420 new AsyncFunction<String, String>() { 421 @Override 422 public ListenableFuture<String> apply(String s) throws Exception { 423 inFunction.countDown(); 424 try { 425 shouldCompleteFunction.await(); 426 } catch (InterruptedException expected) { 427 gotException.countDown(); 428 throw expected; 429 } 430 return immediateFuture("a"); 431 } 432 }; 433 434 ListenableFuture<String> futureResult = 435 transformAsync(input, function, newSingleThreadExecutor()); 436 437 input.set("value"); 438 inFunction.await(); 439 futureResult.cancel(true); 440 shouldCompleteFunction.countDown(); 441 try { 442 futureResult.get(); 443 fail(); 444 } catch (CancellationException expected) { 445 } 446 // TODO(cpovirk): implement interruption, updating this test: 447 // https://github.com/google/guava/issues/1989 448 assertEquals(1, gotException.getCount()); 449 // gotException.await(); 450 } 451 452 public void testTransformAsync_cancelPropagatesToAsyncOutput() throws Exception { 453 ListenableFuture<Foo> immediate = immediateFuture(new Foo()); 454 final SettableFuture<Bar> secondary = SettableFuture.create(); 455 AsyncFunction<Foo, Bar> function = 456 new AsyncFunction<Foo, Bar>() { 457 @Override 458 public ListenableFuture<Bar> apply(Foo unused) { 459 return secondary; 460 } 461 }; 462 assertTrue(transformAsync(immediate, function, directExecutor()).cancel(false)); 463 assertTrue(secondary.isCancelled()); 464 assertFalse(secondary.wasInterrupted()); 465 } 466 467 public void testTransformAsync_interruptPropagatesToAsyncOutput() throws Exception { 468 ListenableFuture<Foo> immediate = immediateFuture(new Foo()); 469 final SettableFuture<Bar> secondary = SettableFuture.create(); 470 AsyncFunction<Foo, Bar> function = 471 new AsyncFunction<Foo, Bar>() { 472 @Override 473 public ListenableFuture<Bar> apply(Foo unused) { 474 return secondary; 475 } 476 }; 477 assertTrue(transformAsync(immediate, function, directExecutor()).cancel(true)); 478 assertTrue(secondary.isCancelled()); 479 assertTrue(secondary.wasInterrupted()); 480 } 481 482 public void testTransformAsync_inputCancelButNotInterruptPropagatesToOutput() throws Exception { 483 SettableFuture<Foo> f1 = SettableFuture.create(); 484 final SettableFuture<Bar> secondary = SettableFuture.create(); 485 AsyncFunction<Foo, Bar> function = 486 new AsyncFunction<Foo, Bar>() { 487 @Override 488 public ListenableFuture<Bar> apply(Foo unused) { 489 return secondary; 490 } 491 }; 492 ListenableFuture<Bar> f2 = transformAsync(f1, function, directExecutor()); 493 f1.cancel(true); 494 assertTrue(f2.isCancelled()); 495 /* 496 * We might like to propagate interruption, too, but it's not clear that it matters. For now, we 497 * test for the behavior that we have today. 498 */ 499 assertFalse(((AbstractFuture<?>) f2).wasInterrupted()); 500 } 501 502 /* 503 * Android does not handle this stack overflow gracefully... though somehow some other 504 * stack-overflow tests work. It must depend on the exact place the error occurs. 505 */ 506 @AndroidIncompatible 507 @GwtIncompatible // StackOverflowError 508 public void testTransformAsync_StackOverflow() throws Exception { 509 { 510 /* 511 * Initialize all relevant classes before running the test, which may otherwise poison any 512 * classes it is trying to load during its stack overflow. 513 */ 514 SettableFuture<Object> root = SettableFuture.create(); 515 ListenableFuture<Object> unused = transformAsync(root, asyncIdentity(), directExecutor()); 516 root.set("foo"); 517 } 518 519 SettableFuture<Object> root = SettableFuture.create(); 520 ListenableFuture<Object> output = root; 521 for (int i = 0; i < 10000; i++) { 522 output = transformAsync(output, asyncIdentity(), directExecutor()); 523 } 524 try { 525 root.set("foo"); 526 fail(); 527 } catch (StackOverflowError expected) { 528 } 529 } 530 531 public void testTransformAsync_ErrorAfterCancellation() throws Exception { 532 class Transformer implements AsyncFunction<Object, Object> { 533 ListenableFuture<Object> output; 534 535 @Override 536 public ListenableFuture<Object> apply(Object input) { 537 output.cancel(false); 538 throw new MyError(); 539 } 540 } 541 Transformer transformer = new Transformer(); 542 SettableFuture<Object> input = SettableFuture.create(); 543 544 ListenableFuture<Object> output = transformAsync(input, transformer, directExecutor()); 545 transformer.output = output; 546 547 input.set("foo"); 548 assertTrue(output.isCancelled()); 549 } 550 551 public void testTransformAsync_ExceptionAfterCancellation() throws Exception { 552 class Transformer implements AsyncFunction<Object, Object> { 553 ListenableFuture<Object> output; 554 555 @Override 556 public ListenableFuture<Object> apply(Object input) { 557 output.cancel(false); 558 throw new MyRuntimeException(); 559 } 560 } 561 Transformer transformer = new Transformer(); 562 SettableFuture<Object> input = SettableFuture.create(); 563 564 ListenableFuture<Object> output = transformAsync(input, transformer, directExecutor()); 565 transformer.output = output; 566 567 input.set("foo"); 568 assertTrue(output.isCancelled()); 569 } 570 571 public void testTransformAsync_getThrowsRuntimeException() throws Exception { 572 ListenableFuture<Object> input = 573 UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); 574 575 ListenableFuture<Object> output = transformAsync(input, asyncIdentity(), directExecutor()); 576 try { 577 getDone(output); 578 fail(); 579 } catch (ExecutionException expected) { 580 assertThat(expected.getCause()).isInstanceOf(MyRuntimeException.class); 581 } 582 } 583 584 public void testTransformAsync_getThrowsError() throws Exception { 585 ListenableFuture<Object> input = UncheckedThrowingFuture.throwingError(new MyError()); 586 587 ListenableFuture<Object> output = transformAsync(input, asyncIdentity(), directExecutor()); 588 try { 589 getDone(output); 590 fail(); 591 } catch (ExecutionException expected) { 592 assertThat(expected.getCause()).isInstanceOf(MyError.class); 593 } 594 } 595 596 public void testTransformAsync_listenerThrowsError() throws Exception { 597 SettableFuture<Object> input = SettableFuture.create(); 598 ListenableFuture<Object> output = transformAsync(input, asyncIdentity(), directExecutor()); 599 600 output.addListener( 601 new Runnable() { 602 @Override 603 public void run() { 604 throw new MyError(); 605 } 606 }, 607 directExecutor()); 608 try { 609 input.set("foo"); 610 fail(); 611 } catch (MyError expected) { 612 } 613 } 614 615 public void testTransform_rejectionPropagatesToOutput() throws Exception { 616 SettableFuture<Foo> input = SettableFuture.create(); 617 Function<Foo, Foo> identity = identity(); 618 ListenableFuture<Foo> transformed = transform(input, identity, REJECTING_EXECUTOR); 619 input.set(new Foo()); 620 try { 621 getDone(transformed); 622 fail(); 623 } catch (ExecutionException expected) { 624 assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); 625 } 626 } 627 628 public void testTransformAsync_rejectionPropagatesToOutput() throws Exception { 629 SettableFuture<Foo> input = SettableFuture.create(); 630 AsyncFunction<Foo, Foo> asyncIdentity = asyncIdentity(); 631 ListenableFuture<Foo> transformed = transformAsync(input, asyncIdentity, REJECTING_EXECUTOR); 632 input.set(new Foo()); 633 try { 634 getDone(transformed); 635 fail(); 636 } catch (ExecutionException expected) { 637 assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); 638 } 639 } 640 641 /** Tests that the function is invoked only once, even if it throws an exception. */ 642 public void testTransformValueRemainsMemoized() throws Exception { 643 class Holder { 644 645 int value = 2; 646 } 647 final Holder holder = new Holder(); 648 649 // This function adds the holder's value to the input value. 650 Function<Integer, Integer> adder = 651 new Function<Integer, Integer>() { 652 @Override 653 public Integer apply(Integer from) { 654 return from + holder.value; 655 } 656 }; 657 658 // Since holder.value is 2, applying 4 should yield 6. 659 assertEquals(6, adder.apply(4).intValue()); 660 661 ListenableFuture<Integer> immediateFuture = immediateFuture(4); 662 Future<Integer> transformedFuture = transform(immediateFuture, adder, directExecutor()); 663 664 // The composed future also yields 6. 665 assertEquals(6, getDone(transformedFuture).intValue()); 666 667 // Repeated calls yield the same value even though the function's behavior 668 // changes 669 holder.value = 3; 670 assertEquals(6, getDone(transformedFuture).intValue()); 671 assertEquals(7, adder.apply(4).intValue()); 672 673 // Once more, with feeling. 674 holder.value = 4; 675 assertEquals(6, getDone(transformedFuture).intValue()); 676 assertEquals(8, adder.apply(4).intValue()); 677 678 // Memoized get also retains the value. 679 assertEquals(6, getDoneFromTimeoutOverload(transformedFuture).intValue()); 680 681 // Unsurprisingly, recomposing the future will return an updated value. 682 assertEquals(8, getDone(transform(immediateFuture, adder, directExecutor())).intValue()); 683 684 // Repeating, with the timeout version 685 assertEquals( 686 8, 687 getDoneFromTimeoutOverload(transform(immediateFuture, adder, directExecutor())).intValue()); 688 } 689 690 static class MyError extends Error {} 691 692 static class MyRuntimeException extends RuntimeException {} 693 694 /** 695 * Test that the function is invoked only once, even if it throws an exception. Also, test that 696 * that function's result is wrapped in an ExecutionException. 697 */ 698 @GwtIncompatible // reflection 699 public void testTransformExceptionRemainsMemoized() throws Throwable { 700 // We need to test with two input futures since ExecutionList.execute 701 // doesn't catch Errors and we cannot depend on the order that our 702 // transformations run. (So it is possible that the Error being thrown 703 // could prevent our second transformations from running). 704 SettableFuture<Integer> exceptionInput = SettableFuture.create(); 705 ListenableFuture<Integer> exceptionComposedFuture = 706 transform(exceptionInput, newOneTimeExceptionThrower(), directExecutor()); 707 exceptionInput.set(0); 708 runGetIdempotencyTest(exceptionComposedFuture, MyRuntimeException.class); 709 710 SettableFuture<Integer> errorInput = SettableFuture.create(); 711 ListenableFuture<Integer> errorComposedFuture = 712 transform(errorInput, newOneTimeErrorThrower(), directExecutor()); 713 errorInput.set(0); 714 715 runGetIdempotencyTest(errorComposedFuture, MyError.class); 716 717 /* 718 * Try again when the input's value is already filled in, since the flow is 719 * slightly different in that case. 720 */ 721 exceptionComposedFuture = 722 transform(exceptionInput, newOneTimeExceptionThrower(), directExecutor()); 723 runGetIdempotencyTest(exceptionComposedFuture, MyRuntimeException.class); 724 725 runGetIdempotencyTest( 726 transform(errorInput, newOneTimeErrorThrower(), directExecutor()), MyError.class); 727 runGetIdempotencyTest(errorComposedFuture, MyError.class); 728 } 729 730 @GwtIncompatible // reflection 731 private static void runGetIdempotencyTest( 732 Future<Integer> transformedFuture, Class<? extends Throwable> expectedExceptionClass) 733 throws Throwable { 734 for (int i = 0; i < 5; i++) { 735 try { 736 getDone(transformedFuture); 737 fail(); 738 } catch (ExecutionException expected) { 739 if (!expectedExceptionClass.isInstance(expected.getCause())) { 740 throw expected.getCause(); 741 } 742 } 743 } 744 } 745 746 @GwtIncompatible // used only in GwtIncompatible tests 747 private static Function<Integer, Integer> newOneTimeExceptionThrower() { 748 return new Function<Integer, Integer>() { 749 int calls = 0; 750 751 @Override 752 public Integer apply(Integer from) { 753 if (++calls > 1) { 754 fail(); 755 } 756 throw new MyRuntimeException(); 757 } 758 }; 759 } 760 761 @GwtIncompatible // used only in GwtIncompatible tests 762 private static Function<Integer, Integer> newOneTimeErrorThrower() { 763 return new Function<Integer, Integer>() { 764 int calls = 0; 765 766 @Override 767 public Integer apply(Integer from) { 768 if (++calls > 1) { 769 fail(); 770 } 771 throw new MyError(); 772 } 773 }; 774 } 775 776 // TODO(cpovirk): top-level class? 777 static class ExecutorSpy implements Executor { 778 779 Executor delegate; 780 boolean wasExecuted; 781 782 public ExecutorSpy(Executor delegate) { 783 this.delegate = delegate; 784 } 785 786 @Override 787 public void execute(Runnable command) { 788 delegate.execute(command); 789 wasExecuted = true; 790 } 791 } 792 793 public void testTransform_Executor() throws Exception { 794 Object value = new Object(); 795 ExecutorSpy spy = new ExecutorSpy(directExecutor()); 796 797 assertFalse(spy.wasExecuted); 798 799 ListenableFuture<Object> future = transform(immediateFuture(value), identity(), spy); 800 801 assertSame(value, getDone(future)); 802 assertTrue(spy.wasExecuted); 803 } 804 805 @GwtIncompatible // Threads 806 public void testTransformAsync_functionToString() throws Exception { 807 final CountDownLatch functionCalled = new CountDownLatch(1); 808 final CountDownLatch functionBlocking = new CountDownLatch(1); 809 AsyncFunction<Object, Object> function = 810 tagged( 811 "Called my toString", 812 new AsyncFunction<Object, Object>() { 813 @Override 814 public ListenableFuture<Object> apply(Object input) throws Exception { 815 functionCalled.countDown(); 816 functionBlocking.await(); 817 return immediateFuture(null); 818 } 819 }); 820 821 ExecutorService executor = Executors.newSingleThreadExecutor(); 822 try { 823 ListenableFuture<?> output = 824 Futures.transformAsync(immediateFuture(null), function, executor); 825 functionCalled.await(); 826 assertThat(output.toString()).contains(function.toString()); 827 } finally { 828 functionBlocking.countDown(); 829 executor.shutdown(); 830 } 831 } 832 833 @GwtIncompatible // lazyTransform 834 public void testLazyTransform() throws Exception { 835 FunctionSpy<Object, String> spy = new FunctionSpy<>(constant("bar")); 836 Future<String> input = immediateFuture("foo"); 837 Future<String> transformed = lazyTransform(input, spy); 838 spy.verifyCallCount(0); 839 assertEquals("bar", getDone(transformed)); 840 spy.verifyCallCount(1); 841 assertEquals("bar", getDone(transformed)); 842 spy.verifyCallCount(2); 843 } 844 845 @GwtIncompatible // lazyTransform 846 public void testLazyTransform_exception() throws Exception { 847 final RuntimeException exception = new RuntimeException("deliberate"); 848 Function<Integer, String> function = 849 new Function<Integer, String>() { 850 @Override 851 public String apply(Integer input) { 852 throw exception; 853 } 854 }; 855 Future<String> transformed = lazyTransform(immediateFuture(1), function); 856 try { 857 getDone(transformed); 858 fail(); 859 } catch (ExecutionException expected) { 860 assertSame(exception, expected.getCause()); 861 } 862 try { 863 getDoneFromTimeoutOverload(transformed); 864 fail(); 865 } catch (ExecutionException expected) { 866 assertSame(exception, expected.getCause()); 867 } 868 } 869 870 private static class FunctionSpy<I, O> implements Function<I, O> { 871 private int applyCount; 872 private final Function<I, O> delegate; 873 874 public FunctionSpy(Function<I, O> delegate) { 875 this.delegate = delegate; 876 } 877 878 @Override 879 public O apply(I input) { 880 applyCount++; 881 return delegate.apply(input); 882 } 883 884 void verifyCallCount(int expected) { 885 assertThat(applyCount).isEqualTo(expected); 886 } 887 } 888 889 private static <X extends Throwable, V> Function<X, V> unexpectedFunction() { 890 return new Function<X, V>() { 891 @Override 892 public V apply(X t) { 893 throw newAssertionError("Unexpected fallback", t); 894 } 895 }; 896 } 897 898 private static class AsyncFunctionSpy<X extends Throwable, V> implements AsyncFunction<X, V> { 899 private int count; 900 private final AsyncFunction<X, V> delegate; 901 902 public AsyncFunctionSpy(AsyncFunction<X, V> delegate) { 903 this.delegate = delegate; 904 } 905 906 @Override 907 public final ListenableFuture<V> apply(X t) throws Exception { 908 count++; 909 return delegate.apply(t); 910 } 911 912 void verifyCallCount(int expected) { 913 assertThat(count).isEqualTo(expected); 914 } 915 } 916 917 private static <I, O> FunctionSpy<I, O> spy(Function<I, O> delegate) { 918 return new FunctionSpy<>(delegate); 919 } 920 921 private static <X extends Throwable, V> AsyncFunctionSpy<X, V> spy(AsyncFunction<X, V> delegate) { 922 return new AsyncFunctionSpy<>(delegate); 923 } 924 925 private static <X extends Throwable, V> AsyncFunction<X, V> unexpectedAsyncFunction() { 926 return new AsyncFunction<X, V>() { 927 @Override 928 public ListenableFuture<V> apply(X t) { 929 throw newAssertionError("Unexpected fallback", t); 930 } 931 }; 932 } 933 934 /** Alternative to AssertionError(String, Throwable), which doesn't exist in GWT 2.6.1. */ 935 private static AssertionError newAssertionError(String message, Throwable cause) { 936 AssertionError e = new AssertionError(message); 937 e.initCause(cause); 938 return e; 939 } 940 941 // catchingAsync tests cloned from the old withFallback tests: 942 943 public void testCatchingAsync_inputDoesNotRaiseException() throws Exception { 944 AsyncFunction<Throwable, Integer> fallback = unexpectedAsyncFunction(); 945 ListenableFuture<Integer> originalFuture = immediateFuture(7); 946 ListenableFuture<Integer> faultTolerantFuture = 947 catchingAsync(originalFuture, Throwable.class, fallback, directExecutor()); 948 assertEquals(7, getDone(faultTolerantFuture).intValue()); 949 } 950 951 public void testCatchingAsync_inputRaisesException() throws Exception { 952 final RuntimeException raisedException = new RuntimeException(); 953 AsyncFunctionSpy<Throwable, Integer> fallback = 954 spy( 955 new AsyncFunction<Throwable, Integer>() { 956 @Override 957 public ListenableFuture<Integer> apply(Throwable t) throws Exception { 958 assertThat(t).isSameInstanceAs(raisedException); 959 return immediateFuture(20); 960 } 961 }); 962 ListenableFuture<Integer> failingFuture = immediateFailedFuture(raisedException); 963 ListenableFuture<Integer> faultTolerantFuture = 964 catchingAsync(failingFuture, Throwable.class, fallback, directExecutor()); 965 assertEquals(20, getDone(faultTolerantFuture).intValue()); 966 fallback.verifyCallCount(1); 967 } 968 969 @GwtIncompatible // non-Throwable exceptionType 970 public void testCatchingAsync_inputCancelledWithoutFallback() throws Exception { 971 AsyncFunction<Throwable, Integer> fallback = unexpectedAsyncFunction(); 972 ListenableFuture<Integer> originalFuture = immediateCancelledFuture(); 973 ListenableFuture<Integer> faultTolerantFuture = 974 catchingAsync(originalFuture, IOException.class, fallback, directExecutor()); 975 assertTrue(faultTolerantFuture.isCancelled()); 976 } 977 978 public void testCatchingAsync_fallbackGeneratesRuntimeException() throws Exception { 979 RuntimeException expectedException = new RuntimeException(); 980 runExpectedExceptionCatchingAsyncTest(expectedException, false); 981 } 982 983 public void testCatchingAsync_fallbackGeneratesCheckedException() throws Exception { 984 Exception expectedException = new Exception() {}; 985 runExpectedExceptionCatchingAsyncTest(expectedException, false); 986 } 987 988 public void testCatchingAsync_fallbackGeneratesError() throws Exception { 989 final Error error = new Error("deliberate"); 990 AsyncFunction<Throwable, Integer> fallback = 991 new AsyncFunction<Throwable, Integer>() { 992 @Override 993 public ListenableFuture<Integer> apply(Throwable t) throws Exception { 994 throw error; 995 } 996 }; 997 ListenableFuture<Integer> failingFuture = immediateFailedFuture(new RuntimeException()); 998 try { 999 getDone(catchingAsync(failingFuture, Throwable.class, fallback, directExecutor())); 1000 fail(); 1001 } catch (ExecutionException expected) { 1002 assertSame(error, expected.getCause()); 1003 } 1004 } 1005 1006 public void testCatchingAsync_fallbackReturnsRuntimeException() throws Exception { 1007 RuntimeException expectedException = new RuntimeException(); 1008 runExpectedExceptionCatchingAsyncTest(expectedException, true); 1009 } 1010 1011 public void testCatchingAsync_fallbackReturnsCheckedException() throws Exception { 1012 Exception expectedException = new Exception() {}; 1013 runExpectedExceptionCatchingAsyncTest(expectedException, true); 1014 } 1015 1016 private void runExpectedExceptionCatchingAsyncTest( 1017 final Exception expectedException, final boolean wrapInFuture) throws Exception { 1018 AsyncFunctionSpy<Throwable, Integer> fallback = 1019 spy( 1020 new AsyncFunction<Throwable, Integer>() { 1021 @Override 1022 public ListenableFuture<Integer> apply(Throwable t) throws Exception { 1023 if (!wrapInFuture) { 1024 throw expectedException; 1025 } else { 1026 return immediateFailedFuture(expectedException); 1027 } 1028 } 1029 }); 1030 1031 ListenableFuture<Integer> failingFuture = immediateFailedFuture(new RuntimeException()); 1032 1033 ListenableFuture<Integer> faultTolerantFuture = 1034 catchingAsync(failingFuture, Throwable.class, fallback, directExecutor()); 1035 try { 1036 getDone(faultTolerantFuture); 1037 fail(); 1038 } catch (ExecutionException expected) { 1039 assertSame(expectedException, expected.getCause()); 1040 } 1041 fallback.verifyCallCount(1); 1042 } 1043 1044 public void testCatchingAsync_fallbackNotReady() throws Exception { 1045 ListenableFuture<Integer> primary = immediateFailedFuture(new Exception()); 1046 final SettableFuture<Integer> secondary = SettableFuture.create(); 1047 AsyncFunction<Throwable, Integer> fallback = 1048 new AsyncFunction<Throwable, Integer>() { 1049 @Override 1050 public ListenableFuture<Integer> apply(Throwable t) { 1051 return secondary; 1052 } 1053 }; 1054 ListenableFuture<Integer> derived = 1055 catchingAsync(primary, Throwable.class, fallback, directExecutor()); 1056 secondary.set(1); 1057 assertEquals(1, (int) getDone(derived)); 1058 } 1059 1060 public void testCatchingAsync_resultInterruptedBeforeFallback() throws Exception { 1061 SettableFuture<Integer> primary = SettableFuture.create(); 1062 AsyncFunction<Throwable, Integer> fallback = unexpectedAsyncFunction(); 1063 ListenableFuture<Integer> derived = 1064 catchingAsync(primary, Throwable.class, fallback, directExecutor()); 1065 derived.cancel(true); 1066 assertTrue(primary.isCancelled()); 1067 assertTrue(primary.wasInterrupted()); 1068 } 1069 1070 public void testCatchingAsync_resultCancelledBeforeFallback() throws Exception { 1071 SettableFuture<Integer> primary = SettableFuture.create(); 1072 AsyncFunction<Throwable, Integer> fallback = unexpectedAsyncFunction(); 1073 ListenableFuture<Integer> derived = 1074 catchingAsync(primary, Throwable.class, fallback, directExecutor()); 1075 derived.cancel(false); 1076 assertTrue(primary.isCancelled()); 1077 assertFalse(primary.wasInterrupted()); 1078 } 1079 1080 @GwtIncompatible // mocks 1081 // TODO(cpovirk): eliminate use of mocks 1082 @SuppressWarnings("unchecked") 1083 public void testCatchingAsync_resultCancelledAfterFallback() throws Exception { 1084 final SettableFuture<Integer> secondary = SettableFuture.create(); 1085 final RuntimeException raisedException = new RuntimeException(); 1086 AsyncFunctionSpy<Throwable, Integer> fallback = 1087 spy( 1088 new AsyncFunction<Throwable, Integer>() { 1089 @Override 1090 public ListenableFuture<Integer> apply(Throwable t) throws Exception { 1091 assertThat(t).isSameInstanceAs(raisedException); 1092 return secondary; 1093 } 1094 }); 1095 1096 ListenableFuture<Integer> failingFuture = immediateFailedFuture(raisedException); 1097 1098 ListenableFuture<Integer> derived = 1099 catchingAsync(failingFuture, Throwable.class, fallback, directExecutor()); 1100 derived.cancel(false); 1101 assertTrue(secondary.isCancelled()); 1102 assertFalse(secondary.wasInterrupted()); 1103 fallback.verifyCallCount(1); 1104 } 1105 1106 public void testCatchingAsync_nullInsteadOfFuture() throws Exception { 1107 ListenableFuture<Integer> inputFuture = immediateFailedFuture(new Exception()); 1108 ListenableFuture<?> chainedFuture = 1109 catchingAsync( 1110 inputFuture, 1111 Throwable.class, 1112 new AsyncFunction<Throwable, Integer>() { 1113 @Override 1114 @SuppressWarnings("AsyncFunctionReturnsNull") 1115 public ListenableFuture<Integer> apply(Throwable t) { 1116 return null; 1117 } 1118 }, 1119 directExecutor()); 1120 try { 1121 getDone(chainedFuture); 1122 fail(); 1123 } catch (ExecutionException expected) { 1124 NullPointerException cause = (NullPointerException) expected.getCause(); 1125 assertThat(cause) 1126 .hasMessageThat() 1127 .contains( 1128 "AsyncFunction.apply returned null instead of a Future. " 1129 + "Did you mean to return immediateFuture(null)?"); 1130 } 1131 } 1132 1133 @GwtIncompatible // threads 1134 public void testCatchingAsync_interruptPropagatesToTransformingThread() throws Exception { 1135 SettableFuture<String> input = SettableFuture.create(); 1136 final CountDownLatch inFunction = new CountDownLatch(1); 1137 final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); 1138 final CountDownLatch gotException = new CountDownLatch(1); 1139 AsyncFunction<Throwable, String> function = 1140 new AsyncFunction<Throwable, String>() { 1141 @Override 1142 public ListenableFuture<String> apply(Throwable t) throws Exception { 1143 inFunction.countDown(); 1144 try { 1145 shouldCompleteFunction.await(); 1146 } catch (InterruptedException expected) { 1147 gotException.countDown(); 1148 throw expected; 1149 } 1150 return immediateFuture("a"); 1151 } 1152 }; 1153 1154 ListenableFuture<String> futureResult = 1155 catchingAsync(input, Exception.class, function, newSingleThreadExecutor()); 1156 1157 input.setException(new Exception()); 1158 inFunction.await(); 1159 futureResult.cancel(true); 1160 shouldCompleteFunction.countDown(); 1161 try { 1162 futureResult.get(); 1163 fail(); 1164 } catch (CancellationException expected) { 1165 } 1166 // TODO(cpovirk): implement interruption, updating this test: 1167 // https://github.com/google/guava/issues/1989 1168 assertEquals(1, gotException.getCount()); 1169 // gotException.await(); 1170 } 1171 1172 @GwtIncompatible // Threads 1173 public void testCatchingAsync_functionToString() throws Exception { 1174 final CountDownLatch functionCalled = new CountDownLatch(1); 1175 final CountDownLatch functionBlocking = new CountDownLatch(1); 1176 AsyncFunction<Object, Object> function = 1177 tagged( 1178 "Called my toString", 1179 new AsyncFunction<Object, Object>() { 1180 @Override 1181 public ListenableFuture<Object> apply(Object input) throws Exception { 1182 functionCalled.countDown(); 1183 functionBlocking.await(); 1184 return immediateFuture(null); 1185 } 1186 }); 1187 1188 ExecutorService executor = Executors.newSingleThreadExecutor(); 1189 try { 1190 ListenableFuture<?> output = 1191 Futures.catchingAsync( 1192 immediateFailedFuture(new RuntimeException()), Throwable.class, function, executor); 1193 functionCalled.await(); 1194 assertThat(output.toString()).contains(function.toString()); 1195 } finally { 1196 functionBlocking.countDown(); 1197 executor.shutdown(); 1198 } 1199 } 1200 1201 public void testCatchingAsync_futureToString() throws Exception { 1202 final SettableFuture<Object> toReturn = SettableFuture.create(); 1203 AsyncFunction<Object, Object> function = 1204 tagged( 1205 "Called my toString", 1206 new AsyncFunction<Object, Object>() { 1207 @Override 1208 public ListenableFuture<Object> apply(Object input) throws Exception { 1209 return toReturn; 1210 } 1211 }); 1212 1213 ListenableFuture<?> output = 1214 Futures.catchingAsync( 1215 immediateFailedFuture(new RuntimeException()), 1216 Throwable.class, 1217 function, 1218 directExecutor()); 1219 assertThat(output.toString()).contains(toReturn.toString()); 1220 } 1221 1222 // catching tests cloned from the old withFallback tests: 1223 1224 public void testCatching_inputDoesNotRaiseException() throws Exception { 1225 Function<Throwable, Integer> fallback = unexpectedFunction(); 1226 ListenableFuture<Integer> originalFuture = immediateFuture(7); 1227 ListenableFuture<Integer> faultTolerantFuture = 1228 catching(originalFuture, Throwable.class, fallback, directExecutor()); 1229 assertEquals(7, getDone(faultTolerantFuture).intValue()); 1230 } 1231 1232 public void testCatching_inputRaisesException() throws Exception { 1233 final RuntimeException raisedException = new RuntimeException(); 1234 FunctionSpy<Throwable, Integer> fallback = 1235 spy( 1236 new Function<Throwable, Integer>() { 1237 @Override 1238 public Integer apply(Throwable t) { 1239 assertThat(t).isSameInstanceAs(raisedException); 1240 return 20; 1241 } 1242 }); 1243 ListenableFuture<Integer> failingFuture = immediateFailedFuture(raisedException); 1244 ListenableFuture<Integer> faultTolerantFuture = 1245 catching(failingFuture, Throwable.class, fallback, directExecutor()); 1246 assertEquals(20, getDone(faultTolerantFuture).intValue()); 1247 fallback.verifyCallCount(1); 1248 } 1249 1250 @GwtIncompatible // non-Throwable exceptionType 1251 public void testCatching_inputCancelledWithoutFallback() throws Exception { 1252 Function<IOException, Integer> fallback = unexpectedFunction(); 1253 ListenableFuture<Integer> originalFuture = immediateCancelledFuture(); 1254 ListenableFuture<Integer> faultTolerantFuture = 1255 catching(originalFuture, IOException.class, fallback, directExecutor()); 1256 assertTrue(faultTolerantFuture.isCancelled()); 1257 } 1258 1259 public void testCatching_fallbackGeneratesRuntimeException() throws Exception { 1260 RuntimeException expectedException = new RuntimeException(); 1261 runExpectedExceptionCatchingTest(expectedException); 1262 } 1263 1264 /* 1265 * catching() uses a plain Function, so there's no 1266 * testCatching_fallbackGeneratesCheckedException(). 1267 */ 1268 1269 public void testCatching_fallbackGeneratesError() throws Exception { 1270 final Error error = new Error("deliberate"); 1271 Function<Throwable, Integer> fallback = 1272 new Function<Throwable, Integer>() { 1273 @Override 1274 public Integer apply(Throwable t) { 1275 throw error; 1276 } 1277 }; 1278 ListenableFuture<Integer> failingFuture = immediateFailedFuture(new RuntimeException()); 1279 try { 1280 getDone(catching(failingFuture, Throwable.class, fallback, directExecutor())); 1281 fail(); 1282 } catch (ExecutionException expected) { 1283 assertSame(error, expected.getCause()); 1284 } 1285 } 1286 1287 /* 1288 * catching() uses a plain Function, so there's no testCatching_fallbackReturnsRuntimeException() 1289 * or testCatching_fallbackReturnsCheckedException(). 1290 */ 1291 1292 private void runExpectedExceptionCatchingTest(final RuntimeException expectedException) 1293 throws Exception { 1294 FunctionSpy<Throwable, Integer> fallback = 1295 spy( 1296 new Function<Throwable, Integer>() { 1297 @Override 1298 public Integer apply(Throwable t) { 1299 throw expectedException; 1300 } 1301 }); 1302 1303 ListenableFuture<Integer> failingFuture = immediateFailedFuture(new RuntimeException()); 1304 1305 ListenableFuture<Integer> faultTolerantFuture = 1306 catching(failingFuture, Throwable.class, fallback, directExecutor()); 1307 try { 1308 getDone(faultTolerantFuture); 1309 fail(); 1310 } catch (ExecutionException expected) { 1311 assertSame(expectedException, expected.getCause()); 1312 } 1313 fallback.verifyCallCount(1); 1314 } 1315 1316 // catching() uses a plain Function, so there's no testCatching_fallbackNotReady(). 1317 1318 public void testCatching_resultInterruptedBeforeFallback() throws Exception { 1319 SettableFuture<Integer> primary = SettableFuture.create(); 1320 Function<Throwable, Integer> fallback = unexpectedFunction(); 1321 ListenableFuture<Integer> derived = 1322 catching(primary, Throwable.class, fallback, directExecutor()); 1323 derived.cancel(true); 1324 assertTrue(primary.isCancelled()); 1325 assertTrue(primary.wasInterrupted()); 1326 } 1327 1328 public void testCatching_resultCancelledBeforeFallback() throws Exception { 1329 SettableFuture<Integer> primary = SettableFuture.create(); 1330 Function<Throwable, Integer> fallback = unexpectedFunction(); 1331 ListenableFuture<Integer> derived = 1332 catching(primary, Throwable.class, fallback, directExecutor()); 1333 derived.cancel(false); 1334 assertTrue(primary.isCancelled()); 1335 assertFalse(primary.wasInterrupted()); 1336 } 1337 1338 // catching() uses a plain Function, so there's no testCatching_resultCancelledAfterFallback(). 1339 1340 // catching() uses a plain Function, so there's no testCatching_nullInsteadOfFuture(). 1341 1342 // Some tests of the exceptionType parameter: 1343 1344 public void testCatching_Throwable() throws Exception { 1345 Function<Throwable, Integer> fallback = functionReturningOne(); 1346 ListenableFuture<Integer> originalFuture = immediateFailedFuture(new IOException()); 1347 ListenableFuture<Integer> faultTolerantFuture = 1348 catching(originalFuture, Throwable.class, fallback, directExecutor()); 1349 assertEquals(1, (int) getDone(faultTolerantFuture)); 1350 } 1351 1352 @GwtIncompatible // non-Throwable exceptionType 1353 public void testCatching_customTypeMatch() throws Exception { 1354 Function<IOException, Integer> fallback = functionReturningOne(); 1355 ListenableFuture<Integer> originalFuture = immediateFailedFuture(new FileNotFoundException()); 1356 ListenableFuture<Integer> faultTolerantFuture = 1357 catching(originalFuture, IOException.class, fallback, directExecutor()); 1358 assertEquals(1, (int) getDone(faultTolerantFuture)); 1359 } 1360 1361 @GwtIncompatible // non-Throwable exceptionType 1362 public void testCatching_customTypeNoMatch() throws Exception { 1363 Function<IOException, Integer> fallback = functionReturningOne(); 1364 ListenableFuture<Integer> originalFuture = immediateFailedFuture(new RuntimeException()); 1365 ListenableFuture<Integer> faultTolerantFuture = 1366 catching(originalFuture, IOException.class, fallback, directExecutor()); 1367 try { 1368 getDone(faultTolerantFuture); 1369 fail(); 1370 } catch (ExecutionException expected) { 1371 assertThat(expected.getCause()).isInstanceOf(RuntimeException.class); 1372 } 1373 } 1374 1375 @GwtIncompatible // StackOverflowError 1376 public void testCatching_StackOverflow() throws Exception { 1377 { 1378 /* 1379 * Initialize all relevant classes before running the test, which may otherwise poison any 1380 * classes it is trying to load during its stack overflow. 1381 */ 1382 SettableFuture<Object> root = SettableFuture.create(); 1383 ListenableFuture<Object> unused = 1384 catching(root, MyException.class, identity(), directExecutor()); 1385 root.setException(new MyException()); 1386 } 1387 1388 SettableFuture<Object> root = SettableFuture.create(); 1389 ListenableFuture<Object> output = root; 1390 for (int i = 0; i < 10000; i++) { 1391 output = catching(output, MyException.class, identity(), directExecutor()); 1392 } 1393 try { 1394 root.setException(new MyException()); 1395 fail(); 1396 } catch (StackOverflowError expected) { 1397 } 1398 } 1399 1400 public void testCatching_ErrorAfterCancellation() throws Exception { 1401 class Fallback implements Function<Throwable, Object> { 1402 ListenableFuture<Object> output; 1403 1404 @Override 1405 public Object apply(Throwable input) { 1406 output.cancel(false); 1407 throw new MyError(); 1408 } 1409 } 1410 Fallback fallback = new Fallback(); 1411 SettableFuture<Object> input = SettableFuture.create(); 1412 1413 ListenableFuture<Object> output = catching(input, Throwable.class, fallback, directExecutor()); 1414 fallback.output = output; 1415 1416 input.setException(new MyException()); 1417 assertTrue(output.isCancelled()); 1418 } 1419 1420 public void testCatching_ExceptionAfterCancellation() throws Exception { 1421 class Fallback implements Function<Throwable, Object> { 1422 ListenableFuture<Object> output; 1423 1424 @Override 1425 public Object apply(Throwable input) { 1426 output.cancel(false); 1427 throw new MyRuntimeException(); 1428 } 1429 } 1430 Fallback fallback = new Fallback(); 1431 SettableFuture<Object> input = SettableFuture.create(); 1432 1433 ListenableFuture<Object> output = catching(input, Throwable.class, fallback, directExecutor()); 1434 fallback.output = output; 1435 1436 input.setException(new MyException()); 1437 assertTrue(output.isCancelled()); 1438 } 1439 1440 public void testCatching_getThrowsRuntimeException() throws Exception { 1441 ListenableFuture<Object> input = 1442 UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); 1443 1444 // We'd catch only MyRuntimeException.class here, but then the test won't compile under GWT. 1445 ListenableFuture<Object> output = 1446 catching(input, Throwable.class, identity(), directExecutor()); 1447 assertThat(getDone(output)).isInstanceOf(MyRuntimeException.class); 1448 } 1449 1450 public void testCatching_getThrowsError() throws Exception { 1451 ListenableFuture<Object> input = UncheckedThrowingFuture.throwingError(new MyError()); 1452 1453 // We'd catch only MyError.class here, but then the test won't compile under GWT. 1454 ListenableFuture<Object> output = 1455 catching(input, Throwable.class, identity(), directExecutor()); 1456 assertThat(getDone(output)).isInstanceOf(MyError.class); 1457 } 1458 1459 public void testCatching_listenerThrowsError() throws Exception { 1460 SettableFuture<Object> input = SettableFuture.create(); 1461 ListenableFuture<Object> output = 1462 catching(input, Throwable.class, identity(), directExecutor()); 1463 1464 output.addListener( 1465 new Runnable() { 1466 @Override 1467 public void run() { 1468 throw new MyError(); 1469 } 1470 }, 1471 directExecutor()); 1472 try { 1473 input.setException(new MyException()); 1474 fail(); 1475 } catch (MyError expected) { 1476 } 1477 } 1478 1479 public void testCatchingAsync_Throwable() throws Exception { 1480 AsyncFunction<Throwable, Integer> fallback = asyncFunctionReturningOne(); 1481 ListenableFuture<Integer> originalFuture = immediateFailedFuture(new IOException()); 1482 ListenableFuture<Integer> faultTolerantFuture = 1483 catchingAsync(originalFuture, Throwable.class, fallback, directExecutor()); 1484 assertEquals(1, (int) getDone(faultTolerantFuture)); 1485 } 1486 1487 @GwtIncompatible // non-Throwable exceptionType 1488 public void testCatchingAsync_customTypeMatch() throws Exception { 1489 AsyncFunction<IOException, Integer> fallback = asyncFunctionReturningOne(); 1490 ListenableFuture<Integer> originalFuture = immediateFailedFuture(new FileNotFoundException()); 1491 ListenableFuture<Integer> faultTolerantFuture = 1492 catchingAsync(originalFuture, IOException.class, fallback, directExecutor()); 1493 assertEquals(1, (int) getDone(faultTolerantFuture)); 1494 } 1495 1496 @GwtIncompatible // non-Throwable exceptionType 1497 public void testCatchingAsync_customTypeNoMatch() throws Exception { 1498 AsyncFunction<IOException, Integer> fallback = asyncFunctionReturningOne(); 1499 ListenableFuture<Integer> originalFuture = immediateFailedFuture(new RuntimeException()); 1500 ListenableFuture<Integer> faultTolerantFuture = 1501 catchingAsync(originalFuture, IOException.class, fallback, directExecutor()); 1502 try { 1503 getDone(faultTolerantFuture); 1504 fail(); 1505 } catch (ExecutionException expected) { 1506 assertThat(expected.getCause()).isInstanceOf(RuntimeException.class); 1507 } 1508 } 1509 1510 @GwtIncompatible // StackOverflowError 1511 public void testCatchingAsync_StackOverflow() throws Exception { 1512 { 1513 /* 1514 * Initialize all relevant classes before running the test, which may otherwise poison any 1515 * classes it is trying to load during its stack overflow. 1516 */ 1517 SettableFuture<Object> root = SettableFuture.create(); 1518 ListenableFuture<Object> unused = 1519 catchingAsync(root, MyException.class, asyncIdentity(), directExecutor()); 1520 root.setException(new MyException()); 1521 } 1522 1523 SettableFuture<Object> root = SettableFuture.create(); 1524 ListenableFuture<Object> output = root; 1525 for (int i = 0; i < 10000; i++) { 1526 output = catchingAsync(output, MyException.class, asyncIdentity(), directExecutor()); 1527 } 1528 try { 1529 root.setException(new MyException()); 1530 fail(); 1531 } catch (StackOverflowError expected) { 1532 } 1533 } 1534 1535 public void testCatchingAsync_ErrorAfterCancellation() throws Exception { 1536 class Fallback implements AsyncFunction<Throwable, Object> { 1537 ListenableFuture<Object> output; 1538 1539 @Override 1540 public ListenableFuture<Object> apply(Throwable input) { 1541 output.cancel(false); 1542 throw new MyError(); 1543 } 1544 } 1545 Fallback fallback = new Fallback(); 1546 SettableFuture<Object> input = SettableFuture.create(); 1547 1548 ListenableFuture<Object> output = 1549 catchingAsync(input, Throwable.class, fallback, directExecutor()); 1550 fallback.output = output; 1551 1552 input.setException(new MyException()); 1553 assertTrue(output.isCancelled()); 1554 } 1555 1556 public void testCatchingAsync_ExceptionAfterCancellation() throws Exception { 1557 class Fallback implements AsyncFunction<Throwable, Object> { 1558 ListenableFuture<Object> output; 1559 1560 @Override 1561 public ListenableFuture<Object> apply(Throwable input) { 1562 output.cancel(false); 1563 throw new MyRuntimeException(); 1564 } 1565 } 1566 Fallback fallback = new Fallback(); 1567 SettableFuture<Object> input = SettableFuture.create(); 1568 1569 ListenableFuture<Object> output = 1570 catchingAsync(input, Throwable.class, fallback, directExecutor()); 1571 fallback.output = output; 1572 1573 input.setException(new MyException()); 1574 assertTrue(output.isCancelled()); 1575 } 1576 1577 public void testCatchingAsync_getThrowsRuntimeException() throws Exception { 1578 ListenableFuture<Object> input = 1579 UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); 1580 1581 // We'd catch only MyRuntimeException.class here, but then the test won't compile under GWT. 1582 ListenableFuture<Object> output = 1583 catchingAsync(input, Throwable.class, asyncIdentity(), directExecutor()); 1584 assertThat(getDone(output)).isInstanceOf(MyRuntimeException.class); 1585 } 1586 1587 public void testCatchingAsync_getThrowsError() throws Exception { 1588 ListenableFuture<Object> input = UncheckedThrowingFuture.throwingError(new MyError()); 1589 1590 // We'd catch only MyError.class here, but then the test won't compile under GWT. 1591 ListenableFuture<Object> output = 1592 catchingAsync(input, Throwable.class, asyncIdentity(), directExecutor()); 1593 assertThat(getDone(output)).isInstanceOf(MyError.class); 1594 } 1595 1596 public void testCatchingAsync_listenerThrowsError() throws Exception { 1597 SettableFuture<Object> input = SettableFuture.create(); 1598 ListenableFuture<Object> output = 1599 catchingAsync(input, Throwable.class, asyncIdentity(), directExecutor()); 1600 1601 output.addListener( 1602 new Runnable() { 1603 @Override 1604 public void run() { 1605 throw new MyError(); 1606 } 1607 }, 1608 directExecutor()); 1609 try { 1610 input.setException(new MyException()); 1611 fail(); 1612 } catch (MyError expected) { 1613 } 1614 } 1615 1616 public void testCatching_rejectionPropagatesToOutput() throws Exception { 1617 SettableFuture<String> input = SettableFuture.create(); 1618 ListenableFuture<String> transformed = 1619 catching(input, Throwable.class, constant("foo"), REJECTING_EXECUTOR); 1620 input.setException(new Exception()); 1621 try { 1622 getDone(transformed); 1623 fail(); 1624 } catch (ExecutionException expected) { 1625 assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); 1626 } 1627 } 1628 1629 public void testCatchingAsync_rejectionPropagatesToOutput() throws Exception { 1630 SettableFuture<String> input = SettableFuture.create(); 1631 ListenableFuture<String> transformed = 1632 catchingAsync( 1633 input, 1634 Throwable.class, 1635 constantAsyncFunction(immediateFuture("foo")), 1636 REJECTING_EXECUTOR); 1637 input.setException(new Exception()); 1638 try { 1639 getDone(transformed); 1640 fail(); 1641 } catch (ExecutionException expected) { 1642 assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); 1643 } 1644 } 1645 1646 private <X extends Throwable> Function<X, Integer> functionReturningOne() { 1647 return new Function<X, Integer>() { 1648 @Override 1649 public Integer apply(X t) { 1650 return 1; 1651 } 1652 }; 1653 } 1654 1655 private <X extends Throwable> AsyncFunction<X, Integer> asyncFunctionReturningOne() { 1656 return new AsyncFunction<X, Integer>() { 1657 @Override 1658 public ListenableFuture<Integer> apply(X t) { 1659 return immediateFuture(1); 1660 } 1661 }; 1662 } 1663 1664 private static <I, O> AsyncFunction<I, O> constantAsyncFunction( 1665 final @Nullable ListenableFuture<O> output) { 1666 return new AsyncFunction<I, O>() { 1667 @Override 1668 public ListenableFuture<O> apply(I input) { 1669 return output; 1670 } 1671 }; 1672 } 1673 1674 public void testTransformAsync_genericsWildcard_AsyncFunction() throws Exception { 1675 ListenableFuture<?> nullFuture = immediateFuture(null); 1676 ListenableFuture<?> chainedFuture = 1677 transformAsync(nullFuture, constantAsyncFunction(nullFuture), directExecutor()); 1678 assertNull(getDone(chainedFuture)); 1679 } 1680 1681 public void testTransformAsync_genericsHierarchy_AsyncFunction() throws Exception { 1682 ListenableFuture<FooChild> future = immediateFuture(null); 1683 final BarChild barChild = new BarChild(); 1684 AsyncFunction<Foo, BarChild> function = 1685 new AsyncFunction<Foo, BarChild>() { 1686 @Override 1687 public AbstractFuture<BarChild> apply(Foo unused) { 1688 AbstractFuture<BarChild> future = new AbstractFuture<BarChild>() {}; 1689 future.set(barChild); 1690 return future; 1691 } 1692 }; 1693 Bar bar = getDone(transformAsync(future, function, directExecutor())); 1694 assertSame(barChild, bar); 1695 } 1696 1697 @GwtIncompatible // get() timeout 1698 public void testTransformAsync_asyncFunction_timeout() 1699 throws InterruptedException, ExecutionException { 1700 AsyncFunction<String, Integer> function = constantAsyncFunction(immediateFuture(1)); 1701 ListenableFuture<Integer> future = 1702 transformAsync(SettableFuture.<String>create(), function, directExecutor()); 1703 try { 1704 future.get(1, MILLISECONDS); 1705 fail(); 1706 } catch (TimeoutException expected) { 1707 } 1708 } 1709 1710 public void testTransformAsync_asyncFunction_error() throws InterruptedException { 1711 final Error error = new Error("deliberate"); 1712 AsyncFunction<String, Integer> function = 1713 new AsyncFunction<String, Integer>() { 1714 @Override 1715 public ListenableFuture<Integer> apply(String input) { 1716 throw error; 1717 } 1718 }; 1719 SettableFuture<String> inputFuture = SettableFuture.create(); 1720 ListenableFuture<Integer> outputFuture = 1721 transformAsync(inputFuture, function, directExecutor()); 1722 inputFuture.set("value"); 1723 try { 1724 getDone(outputFuture); 1725 fail(); 1726 } catch (ExecutionException expected) { 1727 assertSame(error, expected.getCause()); 1728 } 1729 } 1730 1731 public void testTransformAsync_asyncFunction_nullInsteadOfFuture() throws Exception { 1732 ListenableFuture<?> inputFuture = immediateFuture("a"); 1733 ListenableFuture<?> chainedFuture = 1734 transformAsync(inputFuture, constantAsyncFunction(null), directExecutor()); 1735 try { 1736 getDone(chainedFuture); 1737 fail(); 1738 } catch (ExecutionException expected) { 1739 NullPointerException cause = (NullPointerException) expected.getCause(); 1740 assertThat(cause) 1741 .hasMessageThat() 1742 .contains( 1743 "AsyncFunction.apply returned null instead of a Future. " 1744 + "Did you mean to return immediateFuture(null)?"); 1745 } 1746 } 1747 1748 @GwtIncompatible // threads 1749 public void testTransformAsync_asyncFunction_cancelledWhileApplyingFunction() 1750 throws InterruptedException, ExecutionException { 1751 final CountDownLatch inFunction = new CountDownLatch(1); 1752 final CountDownLatch functionDone = new CountDownLatch(1); 1753 final SettableFuture<Integer> resultFuture = SettableFuture.create(); 1754 AsyncFunction<String, Integer> function = 1755 new AsyncFunction<String, Integer>() { 1756 @Override 1757 public ListenableFuture<Integer> apply(String input) throws Exception { 1758 inFunction.countDown(); 1759 functionDone.await(); 1760 return resultFuture; 1761 } 1762 }; 1763 SettableFuture<String> inputFuture = SettableFuture.create(); 1764 ListenableFuture<Integer> future = 1765 transformAsync(inputFuture, function, newSingleThreadExecutor()); 1766 inputFuture.set("value"); 1767 inFunction.await(); 1768 future.cancel(false); 1769 functionDone.countDown(); 1770 try { 1771 future.get(); 1772 fail(); 1773 } catch (CancellationException expected) { 1774 } 1775 try { 1776 resultFuture.get(); 1777 fail(); 1778 } catch (CancellationException expected) { 1779 } 1780 } 1781 1782 @GwtIncompatible // threads 1783 public void testTransformAsync_asyncFunction_cancelledBeforeApplyingFunction() 1784 throws InterruptedException { 1785 final AtomicBoolean functionCalled = new AtomicBoolean(); 1786 AsyncFunction<String, Integer> function = 1787 new AsyncFunction<String, Integer>() { 1788 @Override 1789 public ListenableFuture<Integer> apply(String input) throws Exception { 1790 functionCalled.set(true); 1791 return immediateFuture(1); 1792 } 1793 }; 1794 SettableFuture<String> inputFuture = SettableFuture.create(); 1795 ExecutorService executor = newSingleThreadExecutor(); 1796 ListenableFuture<Integer> future = transformAsync(inputFuture, function, executor); 1797 1798 // Pause the executor. 1799 final CountDownLatch beforeFunction = new CountDownLatch(1); 1800 executor.execute( 1801 new Runnable() { 1802 @Override 1803 public void run() { 1804 awaitUninterruptibly(beforeFunction); 1805 } 1806 }); 1807 1808 // Cancel the future after making input available. 1809 inputFuture.set("value"); 1810 future.cancel(false); 1811 1812 // Unpause the executor. 1813 beforeFunction.countDown(); 1814 executor.shutdown(); 1815 assertTrue(executor.awaitTermination(5, SECONDS)); 1816 1817 assertFalse(functionCalled.get()); 1818 } 1819 1820 public void testSubmitAsync_asyncCallable_error() throws InterruptedException { 1821 final Error error = new Error("deliberate"); 1822 AsyncCallable<Integer> callable = 1823 new AsyncCallable<Integer>() { 1824 @Override 1825 public ListenableFuture<Integer> call() { 1826 throw error; 1827 } 1828 }; 1829 SettableFuture<String> inputFuture = SettableFuture.create(); 1830 ListenableFuture<Integer> outputFuture = submitAsync(callable, directExecutor()); 1831 inputFuture.set("value"); 1832 try { 1833 getDone(outputFuture); 1834 fail(); 1835 } catch (ExecutionException expected) { 1836 assertSame(error, expected.getCause()); 1837 } 1838 } 1839 1840 public void testSubmitAsync_asyncCallable_nullInsteadOfFuture() throws Exception { 1841 ListenableFuture<?> chainedFuture = submitAsync(constantAsyncCallable(null), directExecutor()); 1842 try { 1843 getDone(chainedFuture); 1844 fail(); 1845 } catch (ExecutionException expected) { 1846 NullPointerException cause = (NullPointerException) expected.getCause(); 1847 assertThat(cause) 1848 .hasMessageThat() 1849 .contains( 1850 "AsyncCallable.call returned null instead of a Future. " 1851 + "Did you mean to return immediateFuture(null)?"); 1852 } 1853 } 1854 1855 @GwtIncompatible // threads 1856 public void testSubmitAsync_asyncCallable_cancelledWhileApplyingFunction() 1857 throws InterruptedException, ExecutionException { 1858 final CountDownLatch inFunction = new CountDownLatch(1); 1859 final CountDownLatch callableDone = new CountDownLatch(1); 1860 final SettableFuture<Integer> resultFuture = SettableFuture.create(); 1861 AsyncCallable<Integer> callable = 1862 new AsyncCallable<Integer>() { 1863 @Override 1864 public ListenableFuture<Integer> call() throws InterruptedException { 1865 inFunction.countDown(); 1866 callableDone.await(); 1867 return resultFuture; 1868 } 1869 }; 1870 SettableFuture<String> inputFuture = SettableFuture.create(); 1871 ListenableFuture<Integer> future = submitAsync(callable, newSingleThreadExecutor()); 1872 inputFuture.set("value"); 1873 inFunction.await(); 1874 future.cancel(false); 1875 callableDone.countDown(); 1876 try { 1877 future.get(); 1878 fail(); 1879 } catch (CancellationException expected) { 1880 } 1881 try { 1882 resultFuture.get(); 1883 fail(); 1884 } catch (CancellationException expected) { 1885 } 1886 } 1887 1888 @GwtIncompatible // threads 1889 public void testSubmitAsync_asyncCallable_cancelledBeforeApplyingFunction() 1890 throws InterruptedException { 1891 final AtomicBoolean callableCalled = new AtomicBoolean(); 1892 AsyncCallable<Integer> callable = 1893 new AsyncCallable<Integer>() { 1894 @Override 1895 public ListenableFuture<Integer> call() { 1896 callableCalled.set(true); 1897 return immediateFuture(1); 1898 } 1899 }; 1900 ExecutorService executor = newSingleThreadExecutor(); 1901 // Pause the executor. 1902 final CountDownLatch beforeFunction = new CountDownLatch(1); 1903 executor.execute( 1904 new Runnable() { 1905 @Override 1906 public void run() { 1907 awaitUninterruptibly(beforeFunction); 1908 } 1909 }); 1910 ListenableFuture<Integer> future = submitAsync(callable, executor); 1911 future.cancel(false); 1912 1913 // Unpause the executor. 1914 beforeFunction.countDown(); 1915 executor.shutdown(); 1916 assertTrue(executor.awaitTermination(5, SECONDS)); 1917 1918 assertFalse(callableCalled.get()); 1919 } 1920 1921 @GwtIncompatible // threads 1922 public void testSubmitAsync_asyncCallable_returnsInterruptedFuture() throws InterruptedException { 1923 assertThat(Thread.interrupted()).isFalse(); 1924 SettableFuture<Integer> cancelledFuture = SettableFuture.create(); 1925 cancelledFuture.cancel(true); 1926 assertThat(Thread.interrupted()).isFalse(); 1927 ListenableFuture<Integer> future = 1928 submitAsync(constantAsyncCallable(cancelledFuture), directExecutor()); 1929 assertThat(future.isDone()).isTrue(); 1930 assertThat(Thread.interrupted()).isFalse(); 1931 } 1932 1933 public void testSubmit_callable_returnsValue() throws Exception { 1934 Callable<Integer> callable = 1935 new Callable<Integer>() { 1936 @Override 1937 public Integer call() { 1938 return 42; 1939 } 1940 }; 1941 ListenableFuture<Integer> future = submit(callable, directExecutor()); 1942 assertThat(future.isDone()).isTrue(); 1943 assertThat(getDone(future)).isEqualTo(42); 1944 } 1945 1946 public void testSubmit_callable_throwsException() { 1947 final Exception exception = new Exception("Exception for testing"); 1948 Callable<Integer> callable = 1949 new Callable<Integer>() { 1950 @Override 1951 public Integer call() throws Exception { 1952 throw exception; 1953 } 1954 }; 1955 ListenableFuture<Integer> future = submit(callable, directExecutor()); 1956 try { 1957 getDone(future); 1958 fail(); 1959 } catch (ExecutionException expected) { 1960 assertThat(expected).hasCauseThat().isSameInstanceAs(exception); 1961 } 1962 } 1963 1964 public void testSubmit_runnable_completesAfterRun() throws Exception { 1965 final List<Runnable> pendingRunnables = newArrayList(); 1966 final List<Runnable> executedRunnables = newArrayList(); 1967 Runnable runnable = 1968 new Runnable() { 1969 @Override 1970 public void run() { 1971 executedRunnables.add(this); 1972 } 1973 }; 1974 Executor executor = 1975 new Executor() { 1976 @Override 1977 public void execute(Runnable runnable) { 1978 pendingRunnables.add(runnable); 1979 } 1980 }; 1981 ListenableFuture<@Nullable Void> future = submit(runnable, executor); 1982 assertThat(future.isDone()).isFalse(); 1983 assertThat(executedRunnables).isEmpty(); 1984 assertThat(pendingRunnables).hasSize(1); 1985 pendingRunnables.remove(0).run(); 1986 assertThat(future.isDone()).isTrue(); 1987 assertThat(executedRunnables).containsExactly(runnable); 1988 assertThat(pendingRunnables).isEmpty(); 1989 } 1990 1991 public void testSubmit_runnable_throwsException() throws Exception { 1992 final RuntimeException exception = new RuntimeException("Exception for testing"); 1993 Runnable runnable = 1994 new Runnable() { 1995 @Override 1996 public void run() { 1997 throw exception; 1998 } 1999 }; 2000 ListenableFuture<@Nullable Void> future = submit(runnable, directExecutor()); 2001 try { 2002 getDone(future); 2003 fail(); 2004 } catch (ExecutionException expected) { 2005 assertThat(expected).hasCauseThat().isSameInstanceAs(exception); 2006 } 2007 } 2008 2009 @GwtIncompatible // threads 2010 public void testScheduleAsync_asyncCallable_error() throws InterruptedException { 2011 final Error error = new Error("deliberate"); 2012 AsyncCallable<Integer> callable = 2013 new AsyncCallable<Integer>() { 2014 @Override 2015 public ListenableFuture<Integer> call() { 2016 throw error; 2017 } 2018 }; 2019 SettableFuture<String> inputFuture = SettableFuture.create(); 2020 ListenableFuture<Integer> outputFuture = submitAsync(callable, directExecutor()); 2021 inputFuture.set("value"); 2022 try { 2023 getDone(outputFuture); 2024 fail(); 2025 } catch (ExecutionException expected) { 2026 assertSame(error, expected.getCause()); 2027 } 2028 } 2029 2030 @GwtIncompatible // threads 2031 public void testScheduleAsync_asyncCallable_nullInsteadOfFuture() throws Exception { 2032 ListenableFuture<?> chainedFuture = 2033 scheduleAsync( 2034 constantAsyncCallable(null), 1, NANOSECONDS, newSingleThreadScheduledExecutor()); 2035 try { 2036 chainedFuture.get(); 2037 fail(); 2038 } catch (ExecutionException expected) { 2039 NullPointerException cause = (NullPointerException) expected.getCause(); 2040 assertThat(cause) 2041 .hasMessageThat() 2042 .contains( 2043 "AsyncCallable.call returned null instead of a Future. " 2044 + "Did you mean to return immediateFuture(null)?"); 2045 } 2046 } 2047 2048 @GwtIncompatible // threads 2049 public void testScheduleAsync_asyncCallable_cancelledWhileApplyingFunction() 2050 throws InterruptedException, ExecutionException { 2051 final CountDownLatch inFunction = new CountDownLatch(1); 2052 final CountDownLatch callableDone = new CountDownLatch(1); 2053 final SettableFuture<Integer> resultFuture = SettableFuture.create(); 2054 AsyncCallable<Integer> callable = 2055 new AsyncCallable<Integer>() { 2056 @Override 2057 public ListenableFuture<Integer> call() throws InterruptedException { 2058 inFunction.countDown(); 2059 callableDone.await(); 2060 return resultFuture; 2061 } 2062 }; 2063 ListenableFuture<Integer> future = 2064 scheduleAsync(callable, 1, NANOSECONDS, newSingleThreadScheduledExecutor()); 2065 inFunction.await(); 2066 future.cancel(false); 2067 callableDone.countDown(); 2068 try { 2069 future.get(); 2070 fail(); 2071 } catch (CancellationException expected) { 2072 } 2073 try { 2074 resultFuture.get(); 2075 fail(); 2076 } catch (CancellationException expected) { 2077 } 2078 } 2079 2080 @GwtIncompatible // threads 2081 public void testScheduleAsync_asyncCallable_cancelledBeforeCallingFunction() 2082 throws InterruptedException { 2083 final AtomicBoolean callableCalled = new AtomicBoolean(); 2084 AsyncCallable<Integer> callable = 2085 new AsyncCallable<Integer>() { 2086 @Override 2087 public ListenableFuture<Integer> call() { 2088 callableCalled.set(true); 2089 return immediateFuture(1); 2090 } 2091 }; 2092 ScheduledExecutorService executor = newSingleThreadScheduledExecutor(); 2093 // Pause the executor. 2094 final CountDownLatch beforeFunction = new CountDownLatch(1); 2095 executor.execute( 2096 new Runnable() { 2097 @Override 2098 public void run() { 2099 awaitUninterruptibly(beforeFunction); 2100 } 2101 }); 2102 ListenableFuture<Integer> future = scheduleAsync(callable, 1, NANOSECONDS, executor); 2103 future.cancel(false); 2104 2105 // Unpause the executor. 2106 beforeFunction.countDown(); 2107 executor.shutdown(); 2108 assertTrue(executor.awaitTermination(5, SECONDS)); 2109 2110 assertFalse(callableCalled.get()); 2111 } 2112 2113 private static <T> AsyncCallable<T> constantAsyncCallable( 2114 final @Nullable ListenableFuture<T> returnValue) { 2115 return new AsyncCallable<T>() { 2116 @Override 2117 public ListenableFuture<T> call() { 2118 return returnValue; 2119 } 2120 }; 2121 } 2122 2123 /** Runnable which can be called a single time, and only after {@link #expectCall} is called. */ 2124 // TODO(cpovirk): top-level class? 2125 private static class SingleCallListener implements Runnable { 2126 2127 private boolean expectCall = false; 2128 private final AtomicBoolean called = new AtomicBoolean(); 2129 2130 @Override 2131 public void run() { 2132 assertTrue("Listener called before it was expected", expectCall); 2133 assertFalse("Listener called more than once", wasCalled()); 2134 called.set(true); 2135 } 2136 2137 public void expectCall() { 2138 assertFalse("expectCall is already true", expectCall); 2139 expectCall = true; 2140 } 2141 2142 public boolean wasCalled() { 2143 return called.get(); 2144 } 2145 } 2146 2147 public void testAllAsList() throws Exception { 2148 // Create input and output 2149 SettableFuture<String> future1 = SettableFuture.create(); 2150 SettableFuture<String> future2 = SettableFuture.create(); 2151 SettableFuture<String> future3 = SettableFuture.create(); 2152 @SuppressWarnings("unchecked") // array is never modified 2153 ListenableFuture<List<String>> compound = allAsList(future1, future2, future3); 2154 2155 // Attach a listener 2156 SingleCallListener listener = new SingleCallListener(); 2157 compound.addListener(listener, directExecutor()); 2158 2159 // Satisfy each input and check the output 2160 assertFalse(compound.isDone()); 2161 future1.set(DATA1); 2162 assertFalse(compound.isDone()); 2163 future2.set(DATA2); 2164 assertFalse(compound.isDone()); 2165 listener.expectCall(); 2166 future3.set(DATA3); 2167 assertTrue(listener.wasCalled()); 2168 2169 List<String> results = getDone(compound); 2170 assertThat(results).containsExactly(DATA1, DATA2, DATA3).inOrder(); 2171 } 2172 2173 public void testAllAsList_emptyList() throws Exception { 2174 SingleCallListener listener = new SingleCallListener(); 2175 listener.expectCall(); 2176 List<ListenableFuture<String>> futures = ImmutableList.of(); 2177 ListenableFuture<List<String>> compound = allAsList(futures); 2178 compound.addListener(listener, directExecutor()); 2179 assertThat(getDone(compound)).isEmpty(); 2180 assertTrue(listener.wasCalled()); 2181 } 2182 2183 public void testAllAsList_emptyArray() throws Exception { 2184 SingleCallListener listener = new SingleCallListener(); 2185 listener.expectCall(); 2186 @SuppressWarnings("unchecked") // array is never modified 2187 ListenableFuture<List<String>> compound = allAsList(); 2188 compound.addListener(listener, directExecutor()); 2189 assertThat(getDone(compound)).isEmpty(); 2190 assertTrue(listener.wasCalled()); 2191 } 2192 2193 public void testAllAsList_failure() throws Exception { 2194 SingleCallListener listener = new SingleCallListener(); 2195 SettableFuture<String> future1 = SettableFuture.create(); 2196 SettableFuture<String> future2 = SettableFuture.create(); 2197 @SuppressWarnings("unchecked") // array is never modified 2198 ListenableFuture<List<String>> compound = allAsList(future1, future2); 2199 compound.addListener(listener, directExecutor()); 2200 2201 listener.expectCall(); 2202 Throwable exception = new Throwable("failed1"); 2203 future1.setException(exception); 2204 assertTrue(compound.isDone()); 2205 assertTrue(listener.wasCalled()); 2206 assertFalse(future2.isDone()); 2207 2208 try { 2209 getDone(compound); 2210 fail(); 2211 } catch (ExecutionException expected) { 2212 assertSame(exception, expected.getCause()); 2213 } 2214 } 2215 2216 public void testAllAsList_singleFailure() throws Exception { 2217 Throwable exception = new Throwable("failed"); 2218 ListenableFuture<String> future = immediateFailedFuture(exception); 2219 ListenableFuture<List<String>> compound = allAsList(ImmutableList.of(future)); 2220 2221 try { 2222 getDone(compound); 2223 fail(); 2224 } catch (ExecutionException expected) { 2225 assertSame(exception, expected.getCause()); 2226 } 2227 } 2228 2229 public void testAllAsList_immediateFailure() throws Exception { 2230 Throwable exception = new Throwable("failed"); 2231 ListenableFuture<String> future1 = immediateFailedFuture(exception); 2232 ListenableFuture<String> future2 = immediateFuture("results"); 2233 ListenableFuture<List<String>> compound = allAsList(ImmutableList.of(future1, future2)); 2234 2235 try { 2236 getDone(compound); 2237 fail(); 2238 } catch (ExecutionException expected) { 2239 assertSame(exception, expected.getCause()); 2240 } 2241 } 2242 2243 public void testAllAsList_error() throws Exception { 2244 Error error = new Error("deliberate"); 2245 SettableFuture<String> future1 = SettableFuture.create(); 2246 ListenableFuture<String> future2 = immediateFuture("results"); 2247 ListenableFuture<List<String>> compound = allAsList(ImmutableList.of(future1, future2)); 2248 2249 future1.setException(error); 2250 try { 2251 getDone(compound); 2252 fail(); 2253 } catch (ExecutionException expected) { 2254 assertSame(error, expected.getCause()); 2255 } 2256 } 2257 2258 public void testAllAsList_cancelled() throws Exception { 2259 SingleCallListener listener = new SingleCallListener(); 2260 SettableFuture<String> future1 = SettableFuture.create(); 2261 SettableFuture<String> future2 = SettableFuture.create(); 2262 @SuppressWarnings("unchecked") // array is never modified 2263 ListenableFuture<List<String>> compound = allAsList(future1, future2); 2264 compound.addListener(listener, directExecutor()); 2265 2266 listener.expectCall(); 2267 future1.cancel(true); 2268 assertTrue(compound.isDone()); 2269 assertTrue(listener.wasCalled()); 2270 assertFalse(future2.isDone()); 2271 2272 try { 2273 getDone(compound); 2274 fail(); 2275 } catch (CancellationException expected) { 2276 } 2277 } 2278 2279 public void testAllAsList_resultCancelled() throws Exception { 2280 SettableFuture<String> future1 = SettableFuture.create(); 2281 SettableFuture<String> future2 = SettableFuture.create(); 2282 @SuppressWarnings("unchecked") // array is never modified 2283 ListenableFuture<List<String>> compound = allAsList(future1, future2); 2284 2285 future2.set(DATA2); 2286 assertFalse(compound.isDone()); 2287 assertTrue(compound.cancel(false)); 2288 assertTrue(compound.isCancelled()); 2289 assertTrue(future1.isCancelled()); 2290 assertFalse(future1.wasInterrupted()); 2291 } 2292 2293 public void testAllAsList_resultCancelledInterrupted_withSecondaryListFuture() throws Exception { 2294 SettableFuture<String> future1 = SettableFuture.create(); 2295 SettableFuture<String> future2 = SettableFuture.create(); 2296 ListenableFuture<List<String>> compound = allAsList(future1, future2); 2297 // There was a bug where the event listener for the combined future would 2298 // result in the sub-futures being cancelled without being interrupted. 2299 ListenableFuture<List<String>> otherCompound = allAsList(future1, future2); 2300 2301 assertTrue(compound.cancel(true)); 2302 assertTrue(future1.isCancelled()); 2303 assertTrue(future1.wasInterrupted()); 2304 assertTrue(future2.isCancelled()); 2305 assertTrue(future2.wasInterrupted()); 2306 assertTrue(otherCompound.isCancelled()); 2307 } 2308 2309 public void testAllAsList_resultCancelled_withSecondaryListFuture() throws Exception { 2310 SettableFuture<String> future1 = SettableFuture.create(); 2311 SettableFuture<String> future2 = SettableFuture.create(); 2312 ListenableFuture<List<String>> compound = allAsList(future1, future2); 2313 // This next call is "unused," but it is an important part of the test. Don't remove it! 2314 ListenableFuture<List<String>> unused = allAsList(future1, future2); 2315 2316 assertTrue(compound.cancel(false)); 2317 assertTrue(future1.isCancelled()); 2318 assertFalse(future1.wasInterrupted()); 2319 assertTrue(future2.isCancelled()); 2320 assertFalse(future2.wasInterrupted()); 2321 } 2322 2323 public void testAllAsList_resultInterrupted() throws Exception { 2324 SettableFuture<String> future1 = SettableFuture.create(); 2325 SettableFuture<String> future2 = SettableFuture.create(); 2326 @SuppressWarnings("unchecked") // array is never modified 2327 ListenableFuture<List<String>> compound = allAsList(future1, future2); 2328 2329 future2.set(DATA2); 2330 assertFalse(compound.isDone()); 2331 assertTrue(compound.cancel(true)); 2332 assertTrue(compound.isCancelled()); 2333 assertTrue(future1.isCancelled()); 2334 assertTrue(future1.wasInterrupted()); 2335 } 2336 2337 /** 2338 * Test the case where the futures are fulfilled prior to constructing the ListFuture. There was a 2339 * bug where the loop that connects a Listener to each of the futures would die on the last 2340 * loop-check as done() on ListFuture nulled out the variable being looped over (the list of 2341 * futures). 2342 */ 2343 public void testAllAsList_doneFutures() throws Exception { 2344 // Create input and output 2345 SettableFuture<String> future1 = SettableFuture.create(); 2346 SettableFuture<String> future2 = SettableFuture.create(); 2347 SettableFuture<String> future3 = SettableFuture.create(); 2348 2349 // Satisfy each input prior to creating compound and check the output 2350 future1.set(DATA1); 2351 future2.set(DATA2); 2352 future3.set(DATA3); 2353 2354 @SuppressWarnings("unchecked") // array is never modified 2355 ListenableFuture<List<String>> compound = allAsList(future1, future2, future3); 2356 2357 // Attach a listener 2358 SingleCallListener listener = new SingleCallListener(); 2359 listener.expectCall(); 2360 compound.addListener(listener, directExecutor()); 2361 2362 assertTrue(listener.wasCalled()); 2363 2364 List<String> results = getDone(compound); 2365 assertThat(results).containsExactly(DATA1, DATA2, DATA3).inOrder(); 2366 } 2367 2368 /** A single non-error failure is not logged because it is reported via the output future. */ 2369 @SuppressWarnings("unchecked") 2370 public void testAllAsList_logging_exception() throws Exception { 2371 try { 2372 getDone(allAsList(immediateFailedFuture(new MyException()))); 2373 fail(); 2374 } catch (ExecutionException expected) { 2375 assertThat(expected.getCause()).isInstanceOf(MyException.class); 2376 assertEquals( 2377 "Nothing should be logged", 0, aggregateFutureLogHandler.getStoredLogRecords().size()); 2378 } 2379 } 2380 2381 /** Ensure that errors are always logged. */ 2382 @SuppressWarnings("unchecked") 2383 public void testAllAsList_logging_error() throws Exception { 2384 try { 2385 getDone(allAsList(immediateFailedFuture(new MyError()))); 2386 fail(); 2387 } catch (ExecutionException expected) { 2388 assertThat(expected.getCause()).isInstanceOf(MyError.class); 2389 List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords(); 2390 assertThat(logged).hasSize(1); // errors are always logged 2391 assertThat(logged.get(0).getThrown()).isInstanceOf(MyError.class); 2392 } 2393 } 2394 2395 /** All as list will log extra exceptions that have already occurred. */ 2396 @SuppressWarnings("unchecked") 2397 public void testAllAsList_logging_multipleExceptions_alreadyDone() throws Exception { 2398 try { 2399 getDone( 2400 allAsList( 2401 immediateFailedFuture(new MyException()), immediateFailedFuture(new MyException()))); 2402 fail(); 2403 } catch (ExecutionException expected) { 2404 assertThat(expected.getCause()).isInstanceOf(MyException.class); 2405 List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords(); 2406 assertThat(logged).hasSize(1); // the second failure is logged 2407 assertThat(logged.get(0).getThrown()).isInstanceOf(MyException.class); 2408 } 2409 } 2410 2411 /** All as list will log extra exceptions that occur later. */ 2412 @SuppressWarnings("unchecked") 2413 public void testAllAsList_logging_multipleExceptions_doneLater() throws Exception { 2414 SettableFuture<Object> future1 = SettableFuture.create(); 2415 SettableFuture<Object> future2 = SettableFuture.create(); 2416 SettableFuture<Object> future3 = SettableFuture.create(); 2417 ListenableFuture<List<Object>> all = allAsList(future1, future2, future3); 2418 2419 future1.setException(new MyException()); 2420 future2.setException(new MyException()); 2421 future3.setException(new MyException()); 2422 2423 try { 2424 getDone(all); 2425 fail(); 2426 } catch (ExecutionException expected) { 2427 List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords(); 2428 assertThat(logged).hasSize(2); // failures after the first are logged 2429 assertThat(logged.get(0).getThrown()).isInstanceOf(MyException.class); 2430 assertThat(logged.get(1).getThrown()).isInstanceOf(MyException.class); 2431 } 2432 } 2433 2434 /** The same exception happening on multiple futures should not be logged. */ 2435 @SuppressWarnings("unchecked") 2436 public void testAllAsList_logging_same_exception() throws Exception { 2437 try { 2438 MyException sameInstance = new MyException(); 2439 getDone(allAsList(immediateFailedFuture(sameInstance), immediateFailedFuture(sameInstance))); 2440 fail(); 2441 } catch (ExecutionException expected) { 2442 assertThat(expected.getCause()).isInstanceOf(MyException.class); 2443 assertEquals( 2444 "Nothing should be logged", 0, aggregateFutureLogHandler.getStoredLogRecords().size()); 2445 } 2446 } 2447 2448 public void testAllAsList_logging_seenExceptionUpdateRace() throws Exception { 2449 final MyException sameInstance = new MyException(); 2450 SettableFuture<Object> firstFuture = SettableFuture.create(); 2451 final SettableFuture<Object> secondFuture = SettableFuture.create(); 2452 ListenableFuture<List<Object>> bulkFuture = allAsList(firstFuture, secondFuture); 2453 2454 bulkFuture.addListener( 2455 new Runnable() { 2456 @Override 2457 public void run() { 2458 /* 2459 * firstFuture just completed, but AggregateFuture hasn't yet had time to record the 2460 * exception in seenExceptions. When we complete secondFuture with the same exception, 2461 * we want for AggregateFuture to still detect that it's been previously seen. 2462 */ 2463 secondFuture.setException(sameInstance); 2464 } 2465 }, 2466 directExecutor()); 2467 firstFuture.setException(sameInstance); 2468 2469 try { 2470 getDone(bulkFuture); 2471 fail(); 2472 } catch (ExecutionException expected) { 2473 assertThat(expected.getCause()).isInstanceOf(MyException.class); 2474 assertThat(aggregateFutureLogHandler.getStoredLogRecords()).isEmpty(); 2475 } 2476 } 2477 2478 public void testAllAsList_logging_seenExceptionUpdateCancelRace() throws Exception { 2479 final MyException subsequentFailure = new MyException(); 2480 SettableFuture<Object> firstFuture = SettableFuture.create(); 2481 final SettableFuture<Object> secondFuture = SettableFuture.create(); 2482 ListenableFuture<List<Object>> bulkFuture = allAsList(firstFuture, secondFuture); 2483 2484 bulkFuture.addListener( 2485 new Runnable() { 2486 @Override 2487 public void run() { 2488 /* 2489 * This is similar to the above test, but this time we're making sure that we recognize 2490 * that the output Future is done early not because of an exception but because of a 2491 * cancellation. 2492 */ 2493 secondFuture.setException(subsequentFailure); 2494 } 2495 }, 2496 directExecutor()); 2497 firstFuture.cancel(false); 2498 2499 try { 2500 getDone(bulkFuture); 2501 fail(); 2502 } catch (CancellationException expected) { 2503 assertThat(getOnlyElement(aggregateFutureLogHandler.getStoredLogRecords()).getThrown()) 2504 .isSameInstanceAs(subsequentFailure); 2505 } 2506 } 2507 2508 /** 2509 * Different exceptions happening on multiple futures with the same cause should not be logged. 2510 */ 2511 @SuppressWarnings("unchecked") 2512 public void testAllAsList_logging_same_cause() throws Exception { 2513 try { 2514 MyException exception1 = new MyException(); 2515 MyException exception2 = new MyException(); 2516 MyException exception3 = new MyException(); 2517 2518 MyException sameInstance = new MyException(); 2519 exception1.initCause(sameInstance); 2520 exception2.initCause(sameInstance); 2521 exception3.initCause(exception2); 2522 getDone(allAsList(immediateFailedFuture(exception1), immediateFailedFuture(exception3))); 2523 fail(); 2524 } catch (ExecutionException expected) { 2525 assertThat(expected.getCause()).isInstanceOf(MyException.class); 2526 assertEquals( 2527 "Nothing should be logged", 0, aggregateFutureLogHandler.getStoredLogRecords().size()); 2528 } 2529 } 2530 2531 private static String createCombinedResult(Integer i, Boolean b) { 2532 return "-" + i + "-" + b; 2533 } 2534 2535 @GwtIncompatible // threads 2536 public void testWhenAllComplete_noLeakInterruption() throws Exception { 2537 final SettableFuture<String> stringFuture = SettableFuture.create(); 2538 AsyncCallable<String> combiner = 2539 new AsyncCallable<String>() { 2540 @Override 2541 public ListenableFuture<String> call() throws Exception { 2542 return stringFuture; 2543 } 2544 }; 2545 2546 ListenableFuture<String> futureResult = whenAllComplete().callAsync(combiner, directExecutor()); 2547 2548 assertThat(Thread.interrupted()).isFalse(); 2549 futureResult.cancel(true); 2550 assertThat(Thread.interrupted()).isFalse(); 2551 } 2552 2553 public void testWhenAllComplete_wildcard() throws Exception { 2554 ListenableFuture<?> futureA = immediateFuture("a"); 2555 ListenableFuture<?> futureB = immediateFuture("b"); 2556 ListenableFuture<?>[] futures = new ListenableFuture<?>[0]; 2557 Callable<String> combiner = 2558 new Callable<String>() { 2559 @Override 2560 public String call() throws Exception { 2561 return "hi"; 2562 } 2563 }; 2564 2565 // We'd like for all the following to compile. 2566 ListenableFuture<String> unused; 2567 2568 // Compiles: 2569 unused = whenAllComplete(futureA, futureB).call(combiner, directExecutor()); 2570 2571 // Does not compile: 2572 // unused = whenAllComplete(futures).call(combiner); 2573 2574 // Workaround for the above: 2575 unused = whenAllComplete(asList(futures)).call(combiner, directExecutor()); 2576 } 2577 2578 @GwtIncompatible // threads 2579 public void testWhenAllComplete_asyncResult() throws Exception { 2580 SettableFuture<Integer> futureInteger = SettableFuture.create(); 2581 SettableFuture<Boolean> futureBoolean = SettableFuture.create(); 2582 2583 final ExecutorService executor = newSingleThreadExecutor(); 2584 final CountDownLatch callableBlocking = new CountDownLatch(1); 2585 final SettableFuture<String> resultOfCombiner = SettableFuture.create(); 2586 AsyncCallable<String> combiner = 2587 tagged( 2588 "Called my toString", 2589 new AsyncCallable<String>() { 2590 @Override 2591 public ListenableFuture<String> call() throws Exception { 2592 // Make this executor terminate after this task so that the test can tell when 2593 // futureResult has received resultOfCombiner. 2594 executor.shutdown(); 2595 callableBlocking.await(); 2596 return resultOfCombiner; 2597 } 2598 }); 2599 2600 ListenableFuture<String> futureResult = 2601 whenAllComplete(futureInteger, futureBoolean).callAsync(combiner, executor); 2602 2603 // Waiting on backing futures 2604 assertThat(futureResult.toString()) 2605 .matches( 2606 "CombinedFuture@\\w+\\[status=PENDING," 2607 + " info=\\[futures=\\[SettableFuture@\\w+\\[status=PENDING]," 2608 + " SettableFuture@\\w+\\[status=PENDING]]]]"); 2609 Integer integerPartial = 1; 2610 futureInteger.set(integerPartial); 2611 assertThat(futureResult.toString()) 2612 .matches( 2613 "CombinedFuture@\\w+\\[status=PENDING," 2614 + " info=\\[futures=\\[SettableFuture@\\w+\\[status=SUCCESS," 2615 + " result=\\[java.lang.Integer@\\w+]], SettableFuture@\\w+\\[status=PENDING]]]]"); 2616 2617 // Backing futures complete 2618 Boolean booleanPartial = true; 2619 futureBoolean.set(booleanPartial); 2620 // Once the backing futures are done there's a (brief) moment where we know nothing 2621 assertThat(futureResult.toString()).matches("CombinedFuture@\\w+\\[status=PENDING]"); 2622 callableBlocking.countDown(); 2623 // Need to wait for resultFuture to be returned. 2624 assertTrue(executor.awaitTermination(10, SECONDS)); 2625 // But once the async function has returned a future we can include that in the toString 2626 assertThat(futureResult.toString()) 2627 .matches( 2628 "CombinedFuture@\\w+\\[status=PENDING," 2629 + " setFuture=\\[SettableFuture@\\w+\\[status=PENDING]]]"); 2630 2631 // Future complete 2632 resultOfCombiner.set(createCombinedResult(getDone(futureInteger), getDone(futureBoolean))); 2633 String expectedResult = createCombinedResult(integerPartial, booleanPartial); 2634 assertEquals(expectedResult, futureResult.get()); 2635 assertThat(futureResult.toString()) 2636 .matches("CombinedFuture@\\w+\\[status=SUCCESS, result=\\[java.lang.String@\\w+]]"); 2637 } 2638 2639 public void testWhenAllComplete_asyncError() throws Exception { 2640 final Exception thrown = new RuntimeException("test"); 2641 2642 final SettableFuture<Integer> futureInteger = SettableFuture.create(); 2643 final SettableFuture<Boolean> futureBoolean = SettableFuture.create(); 2644 AsyncCallable<String> combiner = 2645 new AsyncCallable<String>() { 2646 @Override 2647 public ListenableFuture<String> call() throws Exception { 2648 assertTrue(futureInteger.isDone()); 2649 assertTrue(futureBoolean.isDone()); 2650 return immediateFailedFuture(thrown); 2651 } 2652 }; 2653 2654 ListenableFuture<String> futureResult = 2655 whenAllComplete(futureInteger, futureBoolean).callAsync(combiner, directExecutor()); 2656 Integer integerPartial = 1; 2657 futureInteger.set(integerPartial); 2658 Boolean booleanPartial = true; 2659 futureBoolean.set(booleanPartial); 2660 2661 try { 2662 getDone(futureResult); 2663 fail(); 2664 } catch (ExecutionException expected) { 2665 assertSame(thrown, expected.getCause()); 2666 } 2667 } 2668 2669 @GwtIncompatible // threads 2670 public void testWhenAllComplete_cancelledNotInterrupted() throws Exception { 2671 SettableFuture<String> stringFuture = SettableFuture.create(); 2672 SettableFuture<Boolean> booleanFuture = SettableFuture.create(); 2673 final CountDownLatch inFunction = new CountDownLatch(1); 2674 final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); 2675 final SettableFuture<String> resultFuture = SettableFuture.create(); 2676 AsyncCallable<String> combiner = 2677 new AsyncCallable<String>() { 2678 @Override 2679 public ListenableFuture<String> call() throws Exception { 2680 inFunction.countDown(); 2681 shouldCompleteFunction.await(); 2682 return resultFuture; 2683 } 2684 }; 2685 2686 ListenableFuture<String> futureResult = 2687 whenAllComplete(stringFuture, booleanFuture).callAsync(combiner, newSingleThreadExecutor()); 2688 2689 stringFuture.set("value"); 2690 booleanFuture.set(true); 2691 inFunction.await(); 2692 futureResult.cancel(false); 2693 shouldCompleteFunction.countDown(); 2694 try { 2695 futureResult.get(); 2696 fail(); 2697 } catch (CancellationException expected) { 2698 } 2699 2700 try { 2701 resultFuture.get(); 2702 fail(); 2703 } catch (CancellationException expected) { 2704 } 2705 } 2706 2707 @GwtIncompatible // threads 2708 public void testWhenAllComplete_interrupted() throws Exception { 2709 SettableFuture<String> stringFuture = SettableFuture.create(); 2710 SettableFuture<Boolean> booleanFuture = SettableFuture.create(); 2711 final CountDownLatch inFunction = new CountDownLatch(1); 2712 final CountDownLatch gotException = new CountDownLatch(1); 2713 AsyncCallable<String> combiner = 2714 new AsyncCallable<String>() { 2715 @Override 2716 public ListenableFuture<String> call() throws Exception { 2717 inFunction.countDown(); 2718 try { 2719 new CountDownLatch(1).await(); // wait for interrupt 2720 } catch (InterruptedException expected) { 2721 gotException.countDown(); 2722 throw expected; 2723 } 2724 return immediateFuture("a"); 2725 } 2726 }; 2727 2728 ListenableFuture<String> futureResult = 2729 whenAllComplete(stringFuture, booleanFuture).callAsync(combiner, newSingleThreadExecutor()); 2730 2731 stringFuture.set("value"); 2732 booleanFuture.set(true); 2733 inFunction.await(); 2734 futureResult.cancel(true); 2735 try { 2736 futureResult.get(); 2737 fail(); 2738 } catch (CancellationException expected) { 2739 } 2740 gotException.await(); 2741 } 2742 2743 public void testWhenAllComplete_runnableResult() throws Exception { 2744 final SettableFuture<Integer> futureInteger = SettableFuture.create(); 2745 final SettableFuture<Boolean> futureBoolean = SettableFuture.create(); 2746 final String[] result = new String[1]; 2747 Runnable combiner = 2748 new Runnable() { 2749 @Override 2750 public void run() { 2751 assertTrue(futureInteger.isDone()); 2752 assertTrue(futureBoolean.isDone()); 2753 result[0] = 2754 createCombinedResult( 2755 Futures.getUnchecked(futureInteger), Futures.getUnchecked(futureBoolean)); 2756 } 2757 }; 2758 2759 ListenableFuture<?> futureResult = 2760 whenAllComplete(futureInteger, futureBoolean).run(combiner, directExecutor()); 2761 Integer integerPartial = 1; 2762 futureInteger.set(integerPartial); 2763 Boolean booleanPartial = true; 2764 futureBoolean.set(booleanPartial); 2765 futureResult.get(); 2766 assertEquals(createCombinedResult(integerPartial, booleanPartial), result[0]); 2767 } 2768 2769 public void testWhenAllComplete_runnableError() throws Exception { 2770 final RuntimeException thrown = new RuntimeException("test"); 2771 2772 final SettableFuture<Integer> futureInteger = SettableFuture.create(); 2773 final SettableFuture<Boolean> futureBoolean = SettableFuture.create(); 2774 Runnable combiner = 2775 new Runnable() { 2776 @Override 2777 public void run() { 2778 assertTrue(futureInteger.isDone()); 2779 assertTrue(futureBoolean.isDone()); 2780 throw thrown; 2781 } 2782 }; 2783 2784 ListenableFuture<?> futureResult = 2785 whenAllComplete(futureInteger, futureBoolean).run(combiner, directExecutor()); 2786 Integer integerPartial = 1; 2787 futureInteger.set(integerPartial); 2788 Boolean booleanPartial = true; 2789 futureBoolean.set(booleanPartial); 2790 2791 try { 2792 getDone(futureResult); 2793 fail(); 2794 } catch (ExecutionException expected) { 2795 assertSame(thrown, expected.getCause()); 2796 } 2797 } 2798 2799 @GwtIncompatible // threads 2800 public void testWhenAllCompleteRunnable_resultCanceledWithoutInterrupt_doesNotInterruptRunnable() 2801 throws Exception { 2802 SettableFuture<String> stringFuture = SettableFuture.create(); 2803 SettableFuture<Boolean> booleanFuture = SettableFuture.create(); 2804 final CountDownLatch inFunction = new CountDownLatch(1); 2805 final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); 2806 final CountDownLatch combinerCompletedWithoutInterrupt = new CountDownLatch(1); 2807 Runnable combiner = 2808 new Runnable() { 2809 @Override 2810 public void run() { 2811 inFunction.countDown(); 2812 try { 2813 shouldCompleteFunction.await(); 2814 combinerCompletedWithoutInterrupt.countDown(); 2815 } catch (InterruptedException e) { 2816 // Ensure the thread's interrupt status is preserved. 2817 Thread.currentThread().interrupt(); 2818 throw new RuntimeException(e); 2819 } 2820 } 2821 }; 2822 2823 ListenableFuture<?> futureResult = 2824 whenAllComplete(stringFuture, booleanFuture).run(combiner, newSingleThreadExecutor()); 2825 2826 stringFuture.set("value"); 2827 booleanFuture.set(true); 2828 inFunction.await(); 2829 futureResult.cancel(false); 2830 shouldCompleteFunction.countDown(); 2831 try { 2832 futureResult.get(); 2833 fail(); 2834 } catch (CancellationException expected) { 2835 } 2836 combinerCompletedWithoutInterrupt.await(); 2837 } 2838 2839 @GwtIncompatible // threads 2840 public void testWhenAllCompleteRunnable_resultCanceledWithInterrupt_InterruptsRunnable() 2841 throws Exception { 2842 SettableFuture<String> stringFuture = SettableFuture.create(); 2843 SettableFuture<Boolean> booleanFuture = SettableFuture.create(); 2844 final CountDownLatch inFunction = new CountDownLatch(1); 2845 final CountDownLatch gotException = new CountDownLatch(1); 2846 Runnable combiner = 2847 new Runnable() { 2848 @Override 2849 public void run() { 2850 inFunction.countDown(); 2851 try { 2852 new CountDownLatch(1).await(); // wait for interrupt 2853 } catch (InterruptedException expected) { 2854 // Ensure the thread's interrupt status is preserved. 2855 Thread.currentThread().interrupt(); 2856 gotException.countDown(); 2857 } 2858 } 2859 }; 2860 2861 ListenableFuture<?> futureResult = 2862 whenAllComplete(stringFuture, booleanFuture).run(combiner, newSingleThreadExecutor()); 2863 2864 stringFuture.set("value"); 2865 booleanFuture.set(true); 2866 inFunction.await(); 2867 futureResult.cancel(true); 2868 try { 2869 futureResult.get(); 2870 fail(); 2871 } catch (CancellationException expected) { 2872 } 2873 gotException.await(); 2874 } 2875 2876 public void testWhenAllSucceed() throws Exception { 2877 class PartialResultException extends Exception {} 2878 2879 final SettableFuture<Integer> futureInteger = SettableFuture.create(); 2880 final SettableFuture<Boolean> futureBoolean = SettableFuture.create(); 2881 AsyncCallable<String> combiner = 2882 new AsyncCallable<String>() { 2883 @Override 2884 public ListenableFuture<String> call() throws Exception { 2885 throw new AssertionFailedError("AsyncCallable should not have been called."); 2886 } 2887 }; 2888 2889 ListenableFuture<String> futureResult = 2890 whenAllSucceed(futureInteger, futureBoolean).callAsync(combiner, directExecutor()); 2891 PartialResultException partialResultException = new PartialResultException(); 2892 futureInteger.setException(partialResultException); 2893 Boolean booleanPartial = true; 2894 futureBoolean.set(booleanPartial); 2895 try { 2896 getDone(futureResult); 2897 fail(); 2898 } catch (ExecutionException expected) { 2899 assertSame(partialResultException, expected.getCause()); 2900 } 2901 } 2902 2903 @AndroidIncompatible 2904 @GwtIncompatible 2905 public void testWhenAllSucceed_releasesInputFuturesUponSubmission() throws Exception { 2906 SettableFuture<Long> future1 = SettableFuture.create(); 2907 SettableFuture<Long> future2 = SettableFuture.create(); 2908 WeakReference<SettableFuture<Long>> future1Ref = new WeakReference<>(future1); 2909 WeakReference<SettableFuture<Long>> future2Ref = new WeakReference<>(future2); 2910 2911 Callable<Long> combiner = 2912 new Callable<Long>() { 2913 @Override 2914 public Long call() { 2915 throw new AssertionError(); 2916 } 2917 }; 2918 2919 ListenableFuture<Long> unused = 2920 whenAllSucceed(future1, future2).call(combiner, noOpScheduledExecutor()); 2921 2922 future1.set(1L); 2923 future1 = null; 2924 future2.set(2L); 2925 future2 = null; 2926 2927 /* 2928 * Futures should be collected even if combiner never runs. This is kind of a silly test, since 2929 * the combiner is almost certain to hold its own reference to the futures, and a real app would 2930 * hold a reference to the executor and thus to the combiner. What we really care about is that 2931 * the futures are released once the combiner is done running. But we happen to provide this 2932 * earlier cleanup at the moment, so we're testing it. 2933 */ 2934 GcFinalization.awaitClear(future1Ref); 2935 GcFinalization.awaitClear(future2Ref); 2936 } 2937 2938 @AndroidIncompatible 2939 @GwtIncompatible 2940 public void testWhenAllComplete_releasesInputFuturesUponCancellation() throws Exception { 2941 SettableFuture<Long> future = SettableFuture.create(); 2942 WeakReference<SettableFuture<Long>> futureRef = new WeakReference<>(future); 2943 2944 Callable<Long> combiner = 2945 new Callable<Long>() { 2946 @Override 2947 public Long call() { 2948 throw new AssertionError(); 2949 } 2950 }; 2951 2952 ListenableFuture<Long> unused = whenAllComplete(future).call(combiner, noOpScheduledExecutor()); 2953 2954 unused.cancel(false); 2955 future = null; 2956 2957 // Future should be collected because whenAll*Complete* doesn't need to look at its result. 2958 GcFinalization.awaitClear(futureRef); 2959 } 2960 2961 @AndroidIncompatible 2962 @GwtIncompatible 2963 public void testWhenAllSucceed_releasesCallable() throws Exception { 2964 AsyncCallable<Long> combiner = 2965 new AsyncCallable<Long>() { 2966 @Override 2967 public ListenableFuture<Long> call() { 2968 return SettableFuture.create(); 2969 } 2970 }; 2971 WeakReference<AsyncCallable<Long>> combinerRef = new WeakReference<>(combiner); 2972 2973 ListenableFuture<Long> unused = 2974 whenAllSucceed(immediateFuture(1L)).callAsync(combiner, directExecutor()); 2975 2976 combiner = null; 2977 // combiner should be collected even if the future it returns never completes. 2978 GcFinalization.awaitClear(combinerRef); 2979 } 2980 2981 /* 2982 * TODO(cpovirk): maybe pass around TestFuture instances instead of 2983 * ListenableFuture instances 2984 */ 2985 2986 /** 2987 * A future in {@link TestFutureBatch} that also has a name for debugging purposes and a {@code 2988 * finisher}, a task that will complete the future in some fashion when it is called, allowing for 2989 * testing both before and after the completion of the future. 2990 */ 2991 @GwtIncompatible // used only in GwtIncompatible tests 2992 private static final class TestFuture { 2993 2994 final ListenableFuture<String> future; 2995 final String name; 2996 final Runnable finisher; 2997 2998 TestFuture(ListenableFuture<String> future, String name, Runnable finisher) { 2999 this.future = future; 3000 this.name = name; 3001 this.finisher = finisher; 3002 } 3003 } 3004 3005 /** 3006 * A collection of several futures, covering cancellation, success, and failure (both {@link 3007 * ExecutionException} and {@link RuntimeException}), both immediate and delayed. We use each 3008 * possible pair of these futures in {@link FuturesTest#runExtensiveMergerTest}. 3009 * 3010 * <p>Each test requires a new {@link TestFutureBatch} because we need new delayed futures each 3011 * time, as the old delayed futures were completed as part of the old test. 3012 */ 3013 @GwtIncompatible // used only in GwtIncompatible tests 3014 private static final class TestFutureBatch { 3015 3016 final ListenableFuture<String> doneSuccess = immediateFuture("a"); 3017 final ListenableFuture<String> doneFailed = immediateFailedFuture(new Exception()); 3018 final SettableFuture<String> doneCancelled = SettableFuture.create(); 3019 3020 { 3021 doneCancelled.cancel(true); 3022 } 3023 3024 final ListenableFuture<String> doneRuntimeException = 3025 new ForwardingListenableFuture<String>() { 3026 final ListenableFuture<String> delegate = immediateFuture("Should never be seen"); 3027 3028 @Override 3029 protected ListenableFuture<String> delegate() { 3030 return delegate; 3031 } 3032 3033 @Override 3034 public String get() { 3035 throw new RuntimeException(); 3036 } 3037 3038 @Override 3039 public String get(long timeout, TimeUnit unit) { 3040 throw new RuntimeException(); 3041 } 3042 }; 3043 3044 final SettableFuture<String> delayedSuccess = SettableFuture.create(); 3045 final SettableFuture<String> delayedFailed = SettableFuture.create(); 3046 final SettableFuture<String> delayedCancelled = SettableFuture.create(); 3047 3048 final SettableFuture<String> delegateForDelayedRuntimeException = SettableFuture.create(); 3049 final ListenableFuture<String> delayedRuntimeException = 3050 new ForwardingListenableFuture<String>() { 3051 @Override 3052 protected ListenableFuture<String> delegate() { 3053 return delegateForDelayedRuntimeException; 3054 } 3055 3056 @Override 3057 public String get() throws ExecutionException, InterruptedException { 3058 delegateForDelayedRuntimeException.get(); 3059 throw new RuntimeException(); 3060 } 3061 3062 @Override 3063 public String get(long timeout, TimeUnit unit) 3064 throws ExecutionException, InterruptedException, TimeoutException { 3065 delegateForDelayedRuntimeException.get(timeout, unit); 3066 throw new RuntimeException(); 3067 } 3068 }; 3069 3070 final Runnable doNothing = 3071 new Runnable() { 3072 @Override 3073 public void run() {} 3074 }; 3075 final Runnable finishSuccess = 3076 new Runnable() { 3077 @Override 3078 public void run() { 3079 delayedSuccess.set("b"); 3080 } 3081 }; 3082 final Runnable finishFailure = 3083 new Runnable() { 3084 @Override 3085 public void run() { 3086 delayedFailed.setException(new Exception()); 3087 } 3088 }; 3089 final Runnable finishCancelled = 3090 new Runnable() { 3091 @Override 3092 public void run() { 3093 delayedCancelled.cancel(true); 3094 } 3095 }; 3096 final Runnable finishRuntimeException = 3097 new Runnable() { 3098 @Override 3099 public void run() { 3100 delegateForDelayedRuntimeException.set("Should never be seen"); 3101 } 3102 }; 3103 3104 /** All the futures, together with human-readable names for use by {@link #smartToString}. */ 3105 final ImmutableList<TestFuture> allFutures = 3106 ImmutableList.of( 3107 new TestFuture(doneSuccess, "doneSuccess", doNothing), 3108 new TestFuture(doneFailed, "doneFailed", doNothing), 3109 new TestFuture(doneCancelled, "doneCancelled", doNothing), 3110 new TestFuture(doneRuntimeException, "doneRuntimeException", doNothing), 3111 new TestFuture(delayedSuccess, "delayedSuccess", finishSuccess), 3112 new TestFuture(delayedFailed, "delayedFailed", finishFailure), 3113 new TestFuture(delayedCancelled, "delayedCancelled", finishCancelled), 3114 new TestFuture( 3115 delayedRuntimeException, "delayedRuntimeException", finishRuntimeException)); 3116 3117 final Function<ListenableFuture<String>, String> nameGetter = 3118 new Function<ListenableFuture<String>, String>() { 3119 @Override 3120 public String apply(ListenableFuture<String> input) { 3121 for (TestFuture future : allFutures) { 3122 if (future.future == input) { 3123 return future.name; 3124 } 3125 } 3126 throw new IllegalArgumentException(input.toString()); 3127 } 3128 }; 3129 3130 static boolean intersect(Set<?> a, Set<?> b) { 3131 return !intersection(a, b).isEmpty(); 3132 } 3133 3134 /** 3135 * Like {@code inputs.toString()}, but with the nonsense {@code toString} representations 3136 * replaced with the name of each future from {@link #allFutures}. 3137 */ 3138 String smartToString(ImmutableSet<ListenableFuture<String>> inputs) { 3139 Iterable<String> inputNames = Iterables.transform(inputs, nameGetter); 3140 return Joiner.on(", ").join(inputNames); 3141 } 3142 3143 void smartAssertTrue( 3144 ImmutableSet<ListenableFuture<String>> inputs, Exception cause, boolean expression) { 3145 if (!expression) { 3146 throw failureWithCause(cause, smartToString(inputs)); 3147 } 3148 } 3149 3150 boolean hasDelayed(ListenableFuture<String> a, ListenableFuture<String> b) { 3151 ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); 3152 return intersect( 3153 inputs, 3154 ImmutableSet.of( 3155 delayedSuccess, delayedFailed, delayedCancelled, delayedRuntimeException)); 3156 } 3157 3158 void assertHasDelayed(ListenableFuture<String> a, ListenableFuture<String> b, Exception e) { 3159 ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); 3160 smartAssertTrue(inputs, e, hasDelayed(a, b)); 3161 } 3162 3163 void assertHasFailure(ListenableFuture<String> a, ListenableFuture<String> b, Exception e) { 3164 ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); 3165 smartAssertTrue( 3166 inputs, 3167 e, 3168 intersect( 3169 inputs, 3170 ImmutableSet.of( 3171 doneFailed, doneRuntimeException, delayedFailed, delayedRuntimeException))); 3172 } 3173 3174 void assertHasCancel(ListenableFuture<String> a, ListenableFuture<String> b, Exception e) { 3175 ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); 3176 smartAssertTrue( 3177 inputs, e, intersect(inputs, ImmutableSet.of(doneCancelled, delayedCancelled))); 3178 } 3179 3180 void assertHasImmediateFailure( 3181 ListenableFuture<String> a, ListenableFuture<String> b, Exception e) { 3182 ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); 3183 smartAssertTrue( 3184 inputs, e, intersect(inputs, ImmutableSet.of(doneFailed, doneRuntimeException))); 3185 } 3186 3187 void assertHasImmediateCancel( 3188 ListenableFuture<String> a, ListenableFuture<String> b, Exception e) { 3189 ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); 3190 smartAssertTrue(inputs, e, intersect(inputs, ImmutableSet.of(doneCancelled))); 3191 } 3192 } 3193 3194 /** 3195 * {@link Futures#allAsList(Iterable)} or {@link Futures#successfulAsList(Iterable)}, hidden 3196 * behind a common interface for testing. 3197 */ 3198 @GwtIncompatible // used only in GwtIncompatible tests 3199 private interface Merger { 3200 3201 ListenableFuture<List<String>> merged(ListenableFuture<String> a, ListenableFuture<String> b); 3202 3203 Merger allMerger = 3204 new Merger() { 3205 @Override 3206 public ListenableFuture<List<String>> merged( 3207 ListenableFuture<String> a, ListenableFuture<String> b) { 3208 return allAsList(ImmutableSet.of(a, b)); 3209 } 3210 }; 3211 Merger successMerger = 3212 new Merger() { 3213 @Override 3214 public ListenableFuture<List<String>> merged( 3215 ListenableFuture<String> a, ListenableFuture<String> b) { 3216 return successfulAsList(ImmutableSet.of(a, b)); 3217 } 3218 }; 3219 } 3220 3221 /** 3222 * Very rough equivalent of a timed get, produced by calling the no-arg get method in another 3223 * thread and waiting a short time for it. 3224 * 3225 * <p>We need this to test the behavior of no-arg get methods without hanging the main test thread 3226 * forever in the case of failure. 3227 */ 3228 @CanIgnoreReturnValue 3229 @GwtIncompatible // threads 3230 static <V> V pseudoTimedGetUninterruptibly(final Future<V> input, long timeout, TimeUnit unit) 3231 throws ExecutionException, TimeoutException { 3232 ExecutorService executor = newSingleThreadExecutor(); 3233 Future<V> waiter = 3234 executor.submit( 3235 new Callable<V>() { 3236 @Override 3237 public V call() throws Exception { 3238 return input.get(); 3239 } 3240 }); 3241 3242 try { 3243 return getUninterruptibly(waiter, timeout, unit); 3244 } catch (ExecutionException e) { 3245 propagateIfInstanceOf(e.getCause(), ExecutionException.class); 3246 propagateIfInstanceOf(e.getCause(), CancellationException.class); 3247 throw failureWithCause(e, "Unexpected exception"); 3248 } finally { 3249 executor.shutdownNow(); 3250 // TODO(cpovirk): assertTrue(awaitTerminationUninterruptibly(executor, 10, SECONDS)); 3251 } 3252 } 3253 3254 /** 3255 * For each possible pair of futures from {@link TestFutureBatch}, for each possible completion 3256 * order of those futures, test that various get calls (timed before future completion, untimed 3257 * before future completion, and untimed after future completion) return or throw the proper 3258 * values. 3259 */ 3260 @GwtIncompatible // used only in GwtIncompatible tests 3261 private static void runExtensiveMergerTest(Merger merger) throws InterruptedException { 3262 int inputCount = new TestFutureBatch().allFutures.size(); 3263 3264 for (int i = 0; i < inputCount; i++) { 3265 for (int j = 0; j < inputCount; j++) { 3266 for (boolean iBeforeJ : new boolean[] {true, false}) { 3267 TestFutureBatch inputs = new TestFutureBatch(); 3268 ListenableFuture<String> iFuture = inputs.allFutures.get(i).future; 3269 ListenableFuture<String> jFuture = inputs.allFutures.get(j).future; 3270 ListenableFuture<List<String>> future = merger.merged(iFuture, jFuture); 3271 3272 // Test timed get before we've completed any delayed futures. 3273 try { 3274 List<String> result = future.get(0, MILLISECONDS); 3275 assertTrue("Got " + result, asList("a", null).containsAll(result)); 3276 } catch (CancellationException e) { 3277 assertTrue(merger == Merger.allMerger); 3278 inputs.assertHasImmediateCancel(iFuture, jFuture, e); 3279 } catch (ExecutionException e) { 3280 assertTrue(merger == Merger.allMerger); 3281 inputs.assertHasImmediateFailure(iFuture, jFuture, e); 3282 } catch (TimeoutException e) { 3283 inputs.assertHasDelayed(iFuture, jFuture, e); 3284 } 3285 3286 // Same tests with pseudoTimedGet. 3287 try { 3288 List<String> result = 3289 conditionalPseudoTimedGetUninterruptibly( 3290 inputs, iFuture, jFuture, future, 20, MILLISECONDS); 3291 assertTrue("Got " + result, asList("a", null).containsAll(result)); 3292 } catch (CancellationException e) { 3293 assertTrue(merger == Merger.allMerger); 3294 inputs.assertHasImmediateCancel(iFuture, jFuture, e); 3295 } catch (ExecutionException e) { 3296 assertTrue(merger == Merger.allMerger); 3297 inputs.assertHasImmediateFailure(iFuture, jFuture, e); 3298 } catch (TimeoutException e) { 3299 inputs.assertHasDelayed(iFuture, jFuture, e); 3300 } 3301 3302 // Finish the two futures in the currently specified order: 3303 inputs.allFutures.get(iBeforeJ ? i : j).finisher.run(); 3304 inputs.allFutures.get(iBeforeJ ? j : i).finisher.run(); 3305 3306 // Test untimed get now that we've completed any delayed futures. 3307 try { 3308 List<String> result = getDone(future); 3309 assertTrue("Got " + result, asList("a", "b", null).containsAll(result)); 3310 } catch (CancellationException e) { 3311 assertTrue(merger == Merger.allMerger); 3312 inputs.assertHasCancel(iFuture, jFuture, e); 3313 } catch (ExecutionException e) { 3314 assertTrue(merger == Merger.allMerger); 3315 inputs.assertHasFailure(iFuture, jFuture, e); 3316 } 3317 } 3318 } 3319 } 3320 } 3321 3322 /** 3323 * Call the non-timed {@link Future#get()} in a way that allows us to abort if it's expected to 3324 * hang forever. More precisely, if it's expected to return, we simply call it[*], but if it's 3325 * expected to hang (because one of the input futures that we know makes it up isn't done yet), 3326 * then we call it in a separate thread (using pseudoTimedGet). The result is that we wait as long 3327 * as necessary when the method is expected to return (at the cost of hanging forever if there is 3328 * a bug in the class under test) but that we time out fairly promptly when the method is expected 3329 * to hang (possibly too quickly, but too-quick failures should be very unlikely, given that we 3330 * used to bail after 20ms during the expected-successful tests, and there we saw a failure rate 3331 * of ~1/5000, meaning that the other thread's get() call nearly always completes within 20ms if 3332 * it's going to complete at all). 3333 * 3334 * <p>[*] To avoid hangs, I've disabled the in-thread calls. This makes the test take (very 3335 * roughly) 2.5s longer. (2.5s is also the maximum length of time we will wait for a timed get 3336 * that is expected to succeed; the fact that the numbers match is only a coincidence.) See the 3337 * comment below for how to restore the fast but hang-y version. 3338 */ 3339 @GwtIncompatible // used only in GwtIncompatible tests 3340 private static List<String> conditionalPseudoTimedGetUninterruptibly( 3341 TestFutureBatch inputs, 3342 ListenableFuture<String> iFuture, 3343 ListenableFuture<String> jFuture, 3344 ListenableFuture<List<String>> future, 3345 int timeout, 3346 TimeUnit unit) 3347 throws ExecutionException, TimeoutException { 3348 /* 3349 * For faster tests (that may hang indefinitely if the class under test has 3350 * a bug!), switch the second branch to call untimed future.get() instead of 3351 * pseudoTimedGet. 3352 */ 3353 return (inputs.hasDelayed(iFuture, jFuture)) 3354 ? pseudoTimedGetUninterruptibly(future, timeout, unit) 3355 : pseudoTimedGetUninterruptibly(future, 2500, MILLISECONDS); 3356 } 3357 3358 @GwtIncompatible // threads 3359 public void testAllAsList_extensive() throws InterruptedException { 3360 runExtensiveMergerTest(Merger.allMerger); 3361 } 3362 3363 @GwtIncompatible // threads 3364 public void testSuccessfulAsList_extensive() throws InterruptedException { 3365 runExtensiveMergerTest(Merger.successMerger); 3366 } 3367 3368 public void testSuccessfulAsList() throws Exception { 3369 // Create input and output 3370 SettableFuture<String> future1 = SettableFuture.create(); 3371 SettableFuture<String> future2 = SettableFuture.create(); 3372 SettableFuture<String> future3 = SettableFuture.create(); 3373 @SuppressWarnings("unchecked") // array is never modified 3374 ListenableFuture<List<String>> compound = successfulAsList(future1, future2, future3); 3375 3376 // Attach a listener 3377 SingleCallListener listener = new SingleCallListener(); 3378 compound.addListener(listener, directExecutor()); 3379 3380 // Satisfy each input and check the output 3381 assertFalse(compound.isDone()); 3382 future1.set(DATA1); 3383 assertFalse(compound.isDone()); 3384 future2.set(DATA2); 3385 assertFalse(compound.isDone()); 3386 listener.expectCall(); 3387 future3.set(DATA3); 3388 assertTrue(listener.wasCalled()); 3389 3390 List<String> results = getDone(compound); 3391 assertThat(results).containsExactly(DATA1, DATA2, DATA3).inOrder(); 3392 } 3393 3394 public void testSuccessfulAsList_emptyList() throws Exception { 3395 SingleCallListener listener = new SingleCallListener(); 3396 listener.expectCall(); 3397 List<ListenableFuture<String>> futures = ImmutableList.of(); 3398 ListenableFuture<List<String>> compound = successfulAsList(futures); 3399 compound.addListener(listener, directExecutor()); 3400 assertThat(getDone(compound)).isEmpty(); 3401 assertTrue(listener.wasCalled()); 3402 } 3403 3404 public void testSuccessfulAsList_emptyArray() throws Exception { 3405 SingleCallListener listener = new SingleCallListener(); 3406 listener.expectCall(); 3407 @SuppressWarnings("unchecked") // array is never modified 3408 ListenableFuture<List<String>> compound = successfulAsList(); 3409 compound.addListener(listener, directExecutor()); 3410 assertThat(getDone(compound)).isEmpty(); 3411 assertTrue(listener.wasCalled()); 3412 } 3413 3414 public void testSuccessfulAsList_partialFailure() throws Exception { 3415 SingleCallListener listener = new SingleCallListener(); 3416 SettableFuture<String> future1 = SettableFuture.create(); 3417 SettableFuture<String> future2 = SettableFuture.create(); 3418 @SuppressWarnings("unchecked") // array is never modified 3419 ListenableFuture<List<String>> compound = successfulAsList(future1, future2); 3420 compound.addListener(listener, directExecutor()); 3421 3422 assertFalse(compound.isDone()); 3423 future1.setException(new Throwable("failed1")); 3424 assertFalse(compound.isDone()); 3425 listener.expectCall(); 3426 future2.set(DATA2); 3427 assertTrue(listener.wasCalled()); 3428 3429 List<String> results = getDone(compound); 3430 assertThat(results).containsExactly(null, DATA2).inOrder(); 3431 } 3432 3433 public void testSuccessfulAsList_totalFailure() throws Exception { 3434 SingleCallListener listener = new SingleCallListener(); 3435 SettableFuture<String> future1 = SettableFuture.create(); 3436 SettableFuture<String> future2 = SettableFuture.create(); 3437 @SuppressWarnings("unchecked") // array is never modified 3438 ListenableFuture<List<String>> compound = successfulAsList(future1, future2); 3439 compound.addListener(listener, directExecutor()); 3440 3441 assertFalse(compound.isDone()); 3442 future1.setException(new Throwable("failed1")); 3443 assertFalse(compound.isDone()); 3444 listener.expectCall(); 3445 future2.setException(new Throwable("failed2")); 3446 assertTrue(listener.wasCalled()); 3447 3448 List<String> results = getDone(compound); 3449 assertThat(results).containsExactly(null, null).inOrder(); 3450 } 3451 3452 public void testSuccessfulAsList_cancelled() throws Exception { 3453 SingleCallListener listener = new SingleCallListener(); 3454 SettableFuture<String> future1 = SettableFuture.create(); 3455 SettableFuture<String> future2 = SettableFuture.create(); 3456 @SuppressWarnings("unchecked") // array is never modified 3457 ListenableFuture<List<String>> compound = successfulAsList(future1, future2); 3458 compound.addListener(listener, directExecutor()); 3459 3460 assertFalse(compound.isDone()); 3461 future1.cancel(true); 3462 assertFalse(compound.isDone()); 3463 listener.expectCall(); 3464 future2.set(DATA2); 3465 assertTrue(listener.wasCalled()); 3466 3467 List<String> results = getDone(compound); 3468 assertThat(results).containsExactly(null, DATA2).inOrder(); 3469 } 3470 3471 public void testSuccessfulAsList_resultCancelled() throws Exception { 3472 SettableFuture<String> future1 = SettableFuture.create(); 3473 SettableFuture<String> future2 = SettableFuture.create(); 3474 @SuppressWarnings("unchecked") // array is never modified 3475 ListenableFuture<List<String>> compound = successfulAsList(future1, future2); 3476 3477 future2.set(DATA2); 3478 assertFalse(compound.isDone()); 3479 assertTrue(compound.cancel(false)); 3480 assertTrue(compound.isCancelled()); 3481 assertTrue(future1.isCancelled()); 3482 assertFalse(future1.wasInterrupted()); 3483 } 3484 3485 public void testSuccessfulAsList_resultCancelledRacingInputDone() throws Exception { 3486 TestLogHandler listenerLoggerHandler = new TestLogHandler(); 3487 Logger exceptionLogger = Logger.getLogger(AbstractFuture.class.getName()); 3488 exceptionLogger.addHandler(listenerLoggerHandler); 3489 try { 3490 doTestSuccessfulAsList_resultCancelledRacingInputDone(); 3491 3492 assertWithMessage("Nothing should be logged") 3493 .that(listenerLoggerHandler.getStoredLogRecords()) 3494 .isEmpty(); 3495 } finally { 3496 exceptionLogger.removeHandler(listenerLoggerHandler); 3497 } 3498 } 3499 3500 private static void doTestSuccessfulAsList_resultCancelledRacingInputDone() throws Exception { 3501 // Simple (combined.cancel -> input.cancel -> setOneValue): 3502 successfulAsList(ImmutableList.of(SettableFuture.create())).cancel(true); 3503 3504 /* 3505 * Complex (combined.cancel -> input.cancel -> other.set -> setOneValue), 3506 * to show that this isn't just about problems with the input future we just 3507 * cancelled: 3508 */ 3509 final SettableFuture<String> future1 = SettableFuture.create(); 3510 final SettableFuture<String> future2 = SettableFuture.create(); 3511 @SuppressWarnings("unchecked") // array is never modified 3512 ListenableFuture<List<String>> compound = successfulAsList(future1, future2); 3513 3514 future1.addListener( 3515 new Runnable() { 3516 @Override 3517 public void run() { 3518 assertTrue(future1.isCancelled()); 3519 /* 3520 * This test relies on behavior that's unspecified but currently 3521 * guaranteed by the implementation: Cancellation of inputs is 3522 * performed in the order they were provided to the constructor. Verify 3523 * that as a sanity check: 3524 */ 3525 assertFalse(future2.isCancelled()); 3526 // Now attempt to trigger the exception: 3527 future2.set(DATA2); 3528 } 3529 }, 3530 directExecutor()); 3531 assertTrue(compound.cancel(false)); 3532 assertTrue(compound.isCancelled()); 3533 assertTrue(future1.isCancelled()); 3534 assertFalse(future2.isCancelled()); 3535 3536 try { 3537 getDone(compound); 3538 fail(); 3539 } catch (CancellationException expected) { 3540 } 3541 } 3542 3543 public void testSuccessfulAsList_resultInterrupted() throws Exception { 3544 SettableFuture<String> future1 = SettableFuture.create(); 3545 SettableFuture<String> future2 = SettableFuture.create(); 3546 @SuppressWarnings("unchecked") // array is never modified 3547 ListenableFuture<List<String>> compound = successfulAsList(future1, future2); 3548 3549 future2.set(DATA2); 3550 assertFalse(compound.isDone()); 3551 assertTrue(compound.cancel(true)); 3552 assertTrue(compound.isCancelled()); 3553 assertTrue(future1.isCancelled()); 3554 assertTrue(future1.wasInterrupted()); 3555 } 3556 3557 public void testSuccessfulAsList_mixed() throws Exception { 3558 SingleCallListener listener = new SingleCallListener(); 3559 SettableFuture<String> future1 = SettableFuture.create(); 3560 SettableFuture<String> future2 = SettableFuture.create(); 3561 SettableFuture<String> future3 = SettableFuture.create(); 3562 @SuppressWarnings("unchecked") // array is never modified 3563 ListenableFuture<List<String>> compound = successfulAsList(future1, future2, future3); 3564 compound.addListener(listener, directExecutor()); 3565 3566 // First is cancelled, second fails, third succeeds 3567 assertFalse(compound.isDone()); 3568 future1.cancel(true); 3569 assertFalse(compound.isDone()); 3570 future2.setException(new Throwable("failed2")); 3571 assertFalse(compound.isDone()); 3572 listener.expectCall(); 3573 future3.set(DATA3); 3574 assertTrue(listener.wasCalled()); 3575 3576 List<String> results = getDone(compound); 3577 assertThat(results).containsExactly(null, null, DATA3).inOrder(); 3578 } 3579 3580 /** Non-Error exceptions are never logged. */ 3581 @SuppressWarnings("unchecked") 3582 public void testSuccessfulAsList_logging_exception() throws Exception { 3583 assertEquals( 3584 newArrayList((Object) null), 3585 getDone(successfulAsList(immediateFailedFuture(new MyException())))); 3586 assertWithMessage("Nothing should be logged") 3587 .that(aggregateFutureLogHandler.getStoredLogRecords()) 3588 .isEmpty(); 3589 3590 // Not even if there are a bunch of failures. 3591 assertEquals( 3592 newArrayList(null, null, null), 3593 getDone( 3594 successfulAsList( 3595 immediateFailedFuture(new MyException()), 3596 immediateFailedFuture(new MyException()), 3597 immediateFailedFuture(new MyException())))); 3598 assertWithMessage("Nothing should be logged") 3599 .that(aggregateFutureLogHandler.getStoredLogRecords()) 3600 .isEmpty(); 3601 } 3602 3603 /** Ensure that errors are always logged. */ 3604 @SuppressWarnings("unchecked") 3605 public void testSuccessfulAsList_logging_error() throws Exception { 3606 assertEquals( 3607 newArrayList((Object) null), 3608 getDone(successfulAsList(immediateFailedFuture(new MyError())))); 3609 List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords(); 3610 assertThat(logged).hasSize(1); // errors are always logged 3611 assertThat(logged.get(0).getThrown()).isInstanceOf(MyError.class); 3612 } 3613 3614 public void testSuccessfulAsList_failureLoggedEvenAfterOutputCancelled() throws Exception { 3615 ListenableFuture<String> input = new CancelPanickingFuture<>(); 3616 ListenableFuture<List<String>> output = successfulAsList(input); 3617 output.cancel(false); 3618 3619 List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords(); 3620 assertThat(logged).hasSize(1); 3621 assertThat(logged.get(0).getThrown()).hasMessageThat().isEqualTo("You can't fire me, I quit."); 3622 } 3623 3624 private static final class CancelPanickingFuture<V> extends AbstractFuture<V> { 3625 @Override 3626 public boolean cancel(boolean mayInterruptIfRunning) { 3627 setException(new Error("You can't fire me, I quit.")); 3628 return false; 3629 } 3630 } 3631 3632 public void testNonCancellationPropagating_successful() throws Exception { 3633 SettableFuture<Foo> input = SettableFuture.create(); 3634 ListenableFuture<Foo> wrapper = nonCancellationPropagating(input); 3635 Foo foo = new Foo(); 3636 3637 assertFalse(wrapper.isDone()); 3638 input.set(foo); 3639 assertTrue(wrapper.isDone()); 3640 assertSame(foo, getDone(wrapper)); 3641 } 3642 3643 public void testNonCancellationPropagating_failure() throws Exception { 3644 SettableFuture<Foo> input = SettableFuture.create(); 3645 ListenableFuture<Foo> wrapper = nonCancellationPropagating(input); 3646 Throwable failure = new Throwable("thrown"); 3647 3648 assertFalse(wrapper.isDone()); 3649 input.setException(failure); 3650 try { 3651 getDone(wrapper); 3652 fail(); 3653 } catch (ExecutionException expected) { 3654 assertSame(failure, expected.getCause()); 3655 } 3656 } 3657 3658 public void testNonCancellationPropagating_delegateCancelled() throws Exception { 3659 SettableFuture<Foo> input = SettableFuture.create(); 3660 ListenableFuture<Foo> wrapper = nonCancellationPropagating(input); 3661 3662 assertFalse(wrapper.isDone()); 3663 assertTrue(input.cancel(false)); 3664 assertTrue(wrapper.isCancelled()); 3665 } 3666 3667 public void testNonCancellationPropagating_doesNotPropagate() throws Exception { 3668 SettableFuture<Foo> input = SettableFuture.create(); 3669 ListenableFuture<Foo> wrapper = nonCancellationPropagating(input); 3670 3671 assertTrue(wrapper.cancel(true)); 3672 assertTrue(wrapper.isCancelled()); 3673 assertTrue(wrapper.isDone()); 3674 assertFalse(input.isCancelled()); 3675 assertFalse(input.isDone()); 3676 } 3677 3678 @GwtIncompatible // used only in GwtIncompatible tests 3679 private static class TestException extends Exception { 3680 3681 TestException(@Nullable Throwable cause) { 3682 super(cause); 3683 } 3684 } 3685 3686 @GwtIncompatible // used only in GwtIncompatible tests 3687 private interface MapperFunction extends Function<Throwable, Exception> {} 3688 3689 public void testCompletionOrder() throws Exception { 3690 SettableFuture<Long> future1 = SettableFuture.create(); 3691 SettableFuture<Long> future2 = SettableFuture.create(); 3692 SettableFuture<Long> future3 = SettableFuture.create(); 3693 SettableFuture<Long> future4 = SettableFuture.create(); 3694 SettableFuture<Long> future5 = SettableFuture.create(); 3695 3696 ImmutableList<ListenableFuture<Long>> futures = 3697 inCompletionOrder( 3698 ImmutableList.<ListenableFuture<Long>>of(future1, future2, future3, future4, future5)); 3699 future2.set(1L); 3700 future5.set(2L); 3701 future1.set(3L); 3702 future3.set(4L); 3703 future4.set(5L); 3704 3705 long expectedResult = 1L; 3706 for (ListenableFuture<Long> future : futures) { 3707 assertEquals((Long) expectedResult, getDone(future)); 3708 expectedResult++; 3709 } 3710 } 3711 3712 public void testCompletionOrderExceptionThrown() throws Exception { 3713 SettableFuture<Long> future1 = SettableFuture.create(); 3714 SettableFuture<Long> future2 = SettableFuture.create(); 3715 SettableFuture<Long> future3 = SettableFuture.create(); 3716 SettableFuture<Long> future4 = SettableFuture.create(); 3717 SettableFuture<Long> future5 = SettableFuture.create(); 3718 3719 ImmutableList<ListenableFuture<Long>> futures = 3720 inCompletionOrder( 3721 ImmutableList.<ListenableFuture<Long>>of(future1, future2, future3, future4, future5)); 3722 future2.set(1L); 3723 future5.setException(new IllegalStateException("2L")); 3724 future1.set(3L); 3725 future3.set(4L); 3726 future4.set(5L); 3727 3728 long expectedResult = 1L; 3729 for (ListenableFuture<Long> future : futures) { 3730 if (expectedResult != 2) { 3731 assertEquals((Long) expectedResult, getDone(future)); 3732 } else { 3733 try { 3734 getDone(future); 3735 fail(); 3736 } catch (ExecutionException expected) { 3737 assertThat(expected).hasCauseThat().hasMessageThat().isEqualTo("2L"); 3738 } 3739 } 3740 expectedResult++; 3741 } 3742 } 3743 3744 public void testCompletionOrderFutureCancelled() throws Exception { 3745 SettableFuture<Long> future1 = SettableFuture.create(); 3746 SettableFuture<Long> future2 = SettableFuture.create(); 3747 SettableFuture<Long> future3 = SettableFuture.create(); 3748 SettableFuture<Long> future4 = SettableFuture.create(); 3749 SettableFuture<Long> future5 = SettableFuture.create(); 3750 3751 ImmutableList<ListenableFuture<Long>> futures = 3752 inCompletionOrder( 3753 ImmutableList.<ListenableFuture<Long>>of(future1, future2, future3, future4, future5)); 3754 future2.set(1L); 3755 future5.set(2L); 3756 future1.set(3L); 3757 future3.cancel(true); 3758 future4.set(5L); 3759 3760 long expectedResult = 1L; 3761 for (ListenableFuture<Long> future : futures) { 3762 if (expectedResult != 4) { 3763 assertEquals((Long) expectedResult, getDone(future)); 3764 } else { 3765 try { 3766 getDone(future); 3767 fail(); 3768 } catch (CancellationException expected) { 3769 } 3770 } 3771 expectedResult++; 3772 } 3773 } 3774 3775 public void testCompletionOrderFutureInterruption() throws Exception { 3776 SettableFuture<Long> future1 = SettableFuture.create(); 3777 SettableFuture<Long> future2 = SettableFuture.create(); 3778 SettableFuture<Long> future3 = SettableFuture.create(); 3779 3780 ImmutableList<ListenableFuture<Long>> futures = 3781 inCompletionOrder(ImmutableList.<ListenableFuture<Long>>of(future1, future2, future3)); 3782 future2.set(1L); 3783 3784 futures.get(1).cancel(true); 3785 futures.get(2).cancel(false); 3786 3787 assertTrue(future1.isCancelled()); 3788 assertFalse(future1.wasInterrupted()); 3789 assertTrue(future3.isCancelled()); 3790 assertFalse(future3.wasInterrupted()); 3791 } 3792 3793 public void testCancellingADelegatePropagates() throws Exception { 3794 SettableFuture<Long> future1 = SettableFuture.create(); 3795 SettableFuture<Long> future2 = SettableFuture.create(); 3796 SettableFuture<Long> future3 = SettableFuture.create(); 3797 3798 ImmutableList<ListenableFuture<Long>> delegates = 3799 inCompletionOrder(ImmutableList.<ListenableFuture<Long>>of(future1, future2, future3)); 3800 3801 future1.set(1L); 3802 // Cannot cancel a complete delegate 3803 assertFalse(delegates.get(0).cancel(true)); 3804 // Cancel the delegate before the input future is done 3805 assertTrue(delegates.get(1).cancel(true)); 3806 // Setting the future still works since cancellation didn't propagate 3807 assertTrue(future2.set(2L)); 3808 // Second check to ensure the input future was not cancelled 3809 assertEquals((Long) 2L, getDone(future2)); 3810 3811 // All futures are now complete; outstanding inputs are cancelled 3812 assertTrue(future3.isCancelled()); 3813 assertTrue(future3.wasInterrupted()); 3814 } 3815 3816 @AndroidIncompatible // runs out of memory under some versions of the emulator 3817 public void testCancellingAllDelegatesIsNotQuadratic() throws Exception { 3818 ImmutableList.Builder<SettableFuture<Long>> builder = ImmutableList.builder(); 3819 for (int i = 0; i < 500_000; i++) { 3820 builder.add(SettableFuture.<Long>create()); 3821 } 3822 ImmutableList<SettableFuture<Long>> inputs = builder.build(); 3823 ImmutableList<ListenableFuture<Long>> delegates = inCompletionOrder(inputs); 3824 3825 for (ListenableFuture<?> delegate : delegates) { 3826 delegate.cancel(true); 3827 } 3828 3829 for (ListenableFuture<?> input : inputs) { 3830 assertTrue(input.isDone()); 3831 } 3832 } 3833 3834 @AndroidIncompatible // reference is never cleared under some versions of the emulator 3835 @GwtIncompatible 3836 public void testInputGCedIfUnreferenced() throws Exception { 3837 SettableFuture<Long> future1 = SettableFuture.create(); 3838 SettableFuture<Long> future2 = SettableFuture.create(); 3839 WeakReference<SettableFuture<Long>> future1Ref = new WeakReference<>(future1); 3840 WeakReference<SettableFuture<Long>> future2Ref = new WeakReference<>(future2); 3841 3842 ImmutableList<ListenableFuture<Long>> delegates = 3843 inCompletionOrder(ImmutableList.<ListenableFuture<Long>>of(future1, future2)); 3844 3845 future1.set(1L); 3846 3847 future1 = null; 3848 // First future is complete, should be unreferenced 3849 GcFinalization.awaitClear(future1Ref); 3850 ListenableFuture<Long> outputFuture1 = delegates.get(0); 3851 delegates = null; 3852 future2 = null; 3853 // No references to list or other output future, second future should be unreferenced 3854 GcFinalization.awaitClear(future2Ref); 3855 outputFuture1.get(); 3856 } 3857 3858 // Mostly an example of how it would look like to use a list of mixed types 3859 public void testCompletionOrderMixedBagOTypes() throws Exception { 3860 SettableFuture<Long> future1 = SettableFuture.create(); 3861 SettableFuture<String> future2 = SettableFuture.create(); 3862 SettableFuture<Integer> future3 = SettableFuture.create(); 3863 3864 ImmutableList<? extends ListenableFuture<?>> inputs = 3865 ImmutableList.<ListenableFuture<?>>of(future1, future2, future3); 3866 ImmutableList<ListenableFuture<Object>> futures = inCompletionOrder(inputs); 3867 future2.set("1L"); 3868 future1.set(2L); 3869 future3.set(3); 3870 3871 ImmutableList<?> expected = ImmutableList.of("1L", 2L, 3); 3872 for (int i = 0; i < expected.size(); i++) { 3873 assertEquals(expected.get(i), getDone(futures.get(i))); 3874 } 3875 } 3876 3877 @GwtIncompatible // ClassSanityTester 3878 public void testFutures_nullChecks() throws Exception { 3879 new ClassSanityTester() 3880 .forAllPublicStaticMethods(Futures.class) 3881 .thatReturn(Future.class) 3882 .testNulls(); 3883 } 3884 3885 static AssertionFailedError failureWithCause(Throwable cause, String message) { 3886 AssertionFailedError failure = new AssertionFailedError(message); 3887 failure.initCause(cause); 3888 return failure; 3889 } 3890 3891 // This test covers a bug where an Error thrown from a callback could cause the TimeoutFuture to 3892 // never complete when timing out. Notably, nothing would get logged since the Error would get 3893 // stuck in the ScheduledFuture inside of TimeoutFuture and nothing ever calls get on it. 3894 3895 // Simulate a timeout that fires before the call the SES.schedule returns but the future is 3896 // already completed. 3897 3898 // This test covers a bug where an Error thrown from a callback could cause the TimeoutFuture to 3899 // never complete when timing out. Notably, nothing would get logged since the Error would get 3900 // stuck in the ScheduledFuture inside of TimeoutFuture and nothing ever calls get on it. 3901 3902 private static final Executor REJECTING_EXECUTOR = 3903 new Executor() { 3904 @Override 3905 public void execute(Runnable runnable) { 3906 throw new RejectedExecutionException(); 3907 } 3908 }; 3909 3910 private static <V> AsyncFunction<V, V> asyncIdentity() { 3911 return new AsyncFunction<V, V>() { 3912 @Override 3913 public ListenableFuture<V> apply(V input) { 3914 return immediateFuture(input); 3915 } 3916 }; 3917 } 3918 3919 private static <I, O> AsyncFunction<I, O> tagged( 3920 final String toString, final AsyncFunction<I, O> function) { 3921 return new AsyncFunction<I, O>() { 3922 @Override 3923 public ListenableFuture<O> apply(I input) throws Exception { 3924 return function.apply(input); 3925 } 3926 3927 @Override 3928 public String toString() { 3929 return toString; 3930 } 3931 }; 3932 } 3933 3934 private static <V> AsyncCallable<V> tagged( 3935 final String toString, final AsyncCallable<V> callable) { 3936 return new AsyncCallable<V>() { 3937 @Override 3938 public ListenableFuture<V> call() throws Exception { 3939 return callable.call(); 3940 } 3941 3942 @Override 3943 public String toString() { 3944 return toString; 3945 } 3946 }; 3947 } 3948 } 3949