1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static com.google.common.truth.Truth.assertWithMessage; 21 22 import com.google.common.collect.Iterables; 23 import com.google.common.collect.Range; 24 import com.google.common.collect.Sets; 25 import com.google.common.util.concurrent.internal.InternalFutureFailureAccess; 26 import java.util.ArrayList; 27 import java.util.Collections; 28 import java.util.List; 29 import java.util.Set; 30 import java.util.concurrent.Callable; 31 import java.util.concurrent.CancellationException; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.CyclicBarrier; 34 import java.util.concurrent.ExecutionException; 35 import java.util.concurrent.Executor; 36 import java.util.concurrent.ExecutorService; 37 import java.util.concurrent.Executors; 38 import java.util.concurrent.Future; 39 import java.util.concurrent.TimeUnit; 40 import java.util.concurrent.TimeoutException; 41 import java.util.concurrent.atomic.AtomicBoolean; 42 import java.util.concurrent.atomic.AtomicInteger; 43 import java.util.concurrent.atomic.AtomicReference; 44 import java.util.concurrent.locks.LockSupport; 45 import junit.framework.AssertionFailedError; 46 import junit.framework.TestCase; 47 48 /** 49 * Tests for {@link AbstractFuture}. 50 * 51 * @author Brian Stoler 52 */ 53 54 public class AbstractFutureTest extends TestCase { testSuccess()55 public void testSuccess() throws ExecutionException, InterruptedException { 56 final Object value = new Object(); 57 assertSame( 58 value, 59 new AbstractFuture<Object>() { 60 { 61 set(value); 62 } 63 }.get()); 64 } 65 testException()66 public void testException() throws InterruptedException { 67 final Throwable failure = new Throwable(); 68 AbstractFuture<String> future = 69 new AbstractFuture<String>() { 70 { 71 setException(failure); 72 } 73 }; 74 75 ExecutionException ee1 = getExpectingExecutionException(future); 76 ExecutionException ee2 = getExpectingExecutionException(future); 77 78 // Ensure we get a unique execution exception on each get 79 assertNotSame(ee1, ee2); 80 81 assertThat(ee1).hasCauseThat().isSameAs(failure); 82 assertThat(ee2).hasCauseThat().isSameAs(failure); 83 84 checkStackTrace(ee1); 85 checkStackTrace(ee2); 86 } 87 testCancel_notDoneNoInterrupt()88 public void testCancel_notDoneNoInterrupt() throws Exception { 89 InterruptibleFuture future = new InterruptibleFuture(); 90 assertTrue(future.cancel(false)); 91 assertTrue(future.isCancelled()); 92 assertTrue(future.isDone()); 93 assertFalse(future.wasInterrupted()); 94 assertFalse(future.interruptTaskWasCalled); 95 try { 96 future.get(); 97 fail("Expected CancellationException"); 98 } catch (CancellationException e) { 99 // See AbstractFutureCancellationCauseTest for how to set causes 100 assertThat(e).hasCauseThat().isNull(); 101 } 102 } 103 testCancel_notDoneInterrupt()104 public void testCancel_notDoneInterrupt() throws Exception { 105 InterruptibleFuture future = new InterruptibleFuture(); 106 assertTrue(future.cancel(true)); 107 assertTrue(future.isCancelled()); 108 assertTrue(future.isDone()); 109 assertTrue(future.wasInterrupted()); 110 assertTrue(future.interruptTaskWasCalled); 111 try { 112 future.get(); 113 fail("Expected CancellationException"); 114 } catch (CancellationException e) { 115 // See AbstractFutureCancellationCauseTest for how to set causes 116 assertThat(e).hasCauseThat().isNull(); 117 } 118 } 119 testCancel_done()120 public void testCancel_done() throws Exception { 121 AbstractFuture<String> future = 122 new AbstractFuture<String>() { 123 { 124 set("foo"); 125 } 126 }; 127 assertFalse(future.cancel(true)); 128 assertFalse(future.isCancelled()); 129 assertTrue(future.isDone()); 130 } 131 testGetWithTimeoutDoneFuture()132 public void testGetWithTimeoutDoneFuture() throws Exception { 133 AbstractFuture<String> future = 134 new AbstractFuture<String>() { 135 { 136 set("foo"); 137 } 138 }; 139 assertEquals("foo", future.get(0, TimeUnit.SECONDS)); 140 } 141 testEvilFuture_setFuture()142 public void testEvilFuture_setFuture() throws Exception { 143 final RuntimeException exception = new RuntimeException("you didn't say the magic word!"); 144 AbstractFuture<String> evilFuture = 145 new AbstractFuture<String>() { 146 @Override 147 public void addListener(Runnable r, Executor e) { 148 throw exception; 149 } 150 }; 151 AbstractFuture<String> normalFuture = new AbstractFuture<String>() {}; 152 normalFuture.setFuture(evilFuture); 153 assertTrue(normalFuture.isDone()); 154 try { 155 normalFuture.get(); 156 fail(); 157 } catch (ExecutionException e) { 158 assertThat(e).hasCauseThat().isSameAs(exception); 159 } 160 } 161 testRemoveWaiter_interruption()162 public void testRemoveWaiter_interruption() throws Exception { 163 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 164 WaiterThread waiter1 = new WaiterThread(future); 165 waiter1.start(); 166 waiter1.awaitWaiting(); 167 168 WaiterThread waiter2 = new WaiterThread(future); 169 waiter2.start(); 170 waiter2.awaitWaiting(); 171 // The waiter queue should be waiter2->waiter1 172 173 // This should wake up waiter1 and cause the waiter1 node to be removed. 174 waiter1.interrupt(); 175 176 waiter1.join(); 177 waiter2.awaitWaiting(); // should still be blocked 178 179 LockSupport.unpark(waiter2); // spurious wakeup 180 waiter2.awaitWaiting(); // should eventually re-park 181 182 future.set(null); 183 waiter2.join(); 184 } 185 testRemoveWaiter_polling()186 public void testRemoveWaiter_polling() throws Exception { 187 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 188 WaiterThread waiter = new WaiterThread(future); 189 waiter.start(); 190 waiter.awaitWaiting(); 191 PollingThread poller = new PollingThread(future); 192 poller.start(); 193 PollingThread poller2 = new PollingThread(future); 194 poller2.start(); 195 PollingThread poller3 = new PollingThread(future); 196 poller3.start(); 197 poller.awaitInLoop(); 198 poller2.awaitInLoop(); 199 poller3.awaitInLoop(); 200 201 // The waiter queue should be {poller x 3}->waiter1 202 waiter.interrupt(); 203 204 // This should wake up waiter1 and cause the waiter1 node to be removed. 205 waiter.join(); 206 future.set(null); 207 poller.join(); 208 } 209 testToString_allUnique()210 public void testToString_allUnique() throws Exception { 211 // Two futures should not have the same toString, to avoid people asserting on it 212 assertThat(SettableFuture.create().toString()).isNotEqualTo(SettableFuture.create().toString()); 213 } 214 testToString_notDone()215 public void testToString_notDone() throws Exception { 216 AbstractFuture<Object> testFuture = 217 new AbstractFuture<Object>() { 218 @Override 219 public String pendingToString() { 220 return "cause=[Because this test isn't done]"; 221 } 222 }; 223 assertThat(testFuture.toString()) 224 .matches( 225 "[^\\[]+\\[status=PENDING, info=\\[cause=\\[Because this test isn't done\\]\\]\\]"); 226 try { 227 testFuture.get(1, TimeUnit.NANOSECONDS); 228 fail(); 229 } catch (TimeoutException e) { 230 assertThat(e.getMessage()).contains("1 nanoseconds"); 231 assertThat(e.getMessage()).contains("Because this test isn't done"); 232 } 233 } 234 235 /** 236 * This test attempts to cause a future to wait for longer than it was requested to from a timed 237 * get() call. As measurements of time are prone to flakiness, it tries to assert based on ranges 238 * derived from observing how much time actually passed for various operations. 239 */ 240 @SuppressWarnings({"DeprecatedThreadMethods", "ThreadPriorityCheck"}) testToString_delayedTimeout()241 public void testToString_delayedTimeout() throws Exception { 242 TimedWaiterThread thread = 243 new TimedWaiterThread(new AbstractFuture<Object>() {}, 2, TimeUnit.SECONDS); 244 thread.start(); 245 thread.awaitWaiting(); 246 thread.suspend(); 247 // Sleep for enough time to add 1500 milliseconds of overwait to the get() call. 248 long toWaitMillis = 3500 - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - thread.startTime); 249 Thread.sleep(toWaitMillis); 250 thread.setPriority(Thread.MAX_PRIORITY); 251 thread.resume(); 252 thread.join(); 253 // It's possible to race and suspend the thread just before the park call actually takes effect, 254 // causing the thread to be suspended for 3.5 seconds, and then park itself for 2 seconds after 255 // being resumed. To avoid a flake in this scenario, calculate how long that thread actually 256 // waited and assert based on that time. Empirically, the race where the thread ends up waiting 257 // for 5.5 seconds happens about 2% of the time. 258 boolean longWait = TimeUnit.NANOSECONDS.toSeconds(thread.timeSpentBlocked) >= 5; 259 // Count how long it actually took to return; we'll accept any number between the expected delay 260 // and the approximate actual delay, to be robust to variance in thread scheduling. 261 char overWaitNanosFirstDigit = 262 Long.toString( 263 thread.timeSpentBlocked - TimeUnit.MILLISECONDS.toNanos(longWait ? 5000 : 3000)) 264 .charAt(0); 265 if (overWaitNanosFirstDigit < '4') { 266 overWaitNanosFirstDigit = '9'; 267 } 268 String nanosRegex = "[4-" + overWaitNanosFirstDigit + "][0-9]+"; 269 assertWithMessage( 270 "Spent " + thread.timeSpentBlocked + " ns blocked; slept for " + toWaitMillis + " ms") 271 .that(thread.exception) 272 .hasMessageThat() 273 .matches( 274 "Waited 2 seconds \\(plus " 275 + (longWait ? "3" : "1") 276 + " seconds, " 277 + nanosRegex 278 + " nanoseconds delay\\).*"); 279 } 280 testToString_completed()281 public void testToString_completed() throws Exception { 282 AbstractFuture<Object> testFuture2 = 283 new AbstractFuture<Object>() { 284 @Override 285 public String pendingToString() { 286 return "cause=[Someday...]"; 287 } 288 }; 289 AbstractFuture<Object> testFuture3 = new AbstractFuture<Object>() {}; 290 testFuture3.setFuture(testFuture2); 291 assertThat(testFuture3.toString()) 292 .matches( 293 "[^\\[]+\\[status=PENDING, info=\\[setFuture=" 294 + "\\[[^\\[]+\\[status=PENDING, info=\\[cause=\\[Someday...\\]\\]\\]\\]\\]\\]"); 295 testFuture2.set("result string"); 296 assertThat(testFuture3.toString()) 297 .matches("[^\\[]+\\[status=SUCCESS, result=\\[result string\\]\\]"); 298 } 299 testToString_cancelled()300 public void testToString_cancelled() throws Exception { 301 assertThat(Futures.immediateCancelledFuture().toString()) 302 .matches("[^\\[]+\\[status=CANCELLED\\]"); 303 } 304 testToString_failed()305 public void testToString_failed() { 306 assertThat(Futures.immediateFailedFuture(new RuntimeException("foo")).toString()) 307 .matches("[^\\[]+\\[status=FAILURE, cause=\\[java.lang.RuntimeException: foo\\]\\]"); 308 } 309 testToString_misbehaving()310 public void testToString_misbehaving() throws Exception { 311 assertThat( 312 new AbstractFuture<Object>() { 313 @Override 314 public String pendingToString() { 315 throw new RuntimeException("I'm a misbehaving implementation"); 316 } 317 }.toString()) 318 .matches( 319 "[^\\[]+\\[status=PENDING, info=\\[Exception thrown from implementation: " 320 + "class java.lang.RuntimeException\\]\\]"); 321 } 322 testCompletionFinishesWithDone()323 public void testCompletionFinishesWithDone() { 324 ExecutorService executor = Executors.newFixedThreadPool(10); 325 for (int i = 0; i < 50000; i++) { 326 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 327 final AtomicReference<String> errorMessage = Atomics.newReference(); 328 executor.execute( 329 new Runnable() { 330 @Override 331 public void run() { 332 future.set("success"); 333 if (!future.isDone()) { 334 errorMessage.set("Set call exited before future was complete."); 335 } 336 } 337 }); 338 executor.execute( 339 new Runnable() { 340 @Override 341 public void run() { 342 future.setException(new IllegalArgumentException("failure")); 343 if (!future.isDone()) { 344 errorMessage.set("SetException call exited before future was complete."); 345 } 346 } 347 }); 348 executor.execute( 349 new Runnable() { 350 @Override 351 public void run() { 352 future.cancel(true); 353 if (!future.isDone()) { 354 errorMessage.set("Cancel call exited before future was complete."); 355 } 356 } 357 }); 358 try { 359 future.get(); 360 } catch (Throwable t) { 361 // Ignore, we just wanted to block. 362 } 363 String error = errorMessage.get(); 364 assertNull(error, error); 365 } 366 executor.shutdown(); 367 } 368 369 /** 370 * He did the bash, he did the future bash The future bash, it was a concurrency smash He did the 371 * bash, it caught on in a flash He did the bash, he did the future bash 372 */ 373 testFutureBash()374 public void testFutureBash() { 375 final CyclicBarrier barrier = 376 new CyclicBarrier( 377 6 // for the setter threads 378 + 50 // for the listeners 379 + 50 // for the blocking get threads, 380 + 1); // for the main thread 381 final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties()); 382 final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference(); 383 final AtomicInteger numSuccessfulSetCalls = new AtomicInteger(); 384 Callable<Void> completeSucessFullyRunnable = 385 new Callable<Void>() { 386 @Override 387 public Void call() { 388 if (currentFuture.get().set("set")) { 389 numSuccessfulSetCalls.incrementAndGet(); 390 } 391 awaitUnchecked(barrier); 392 return null; 393 } 394 }; 395 Callable<Void> completeExceptionallyRunnable = 396 new Callable<Void>() { 397 Exception failureCause = new Exception("setException"); 398 399 @Override 400 public Void call() { 401 if (currentFuture.get().setException(failureCause)) { 402 numSuccessfulSetCalls.incrementAndGet(); 403 } 404 awaitUnchecked(barrier); 405 return null; 406 } 407 }; 408 Callable<Void> cancelRunnable = 409 new Callable<Void>() { 410 @Override 411 public Void call() { 412 if (currentFuture.get().cancel(true)) { 413 numSuccessfulSetCalls.incrementAndGet(); 414 } 415 awaitUnchecked(barrier); 416 return null; 417 } 418 }; 419 Callable<Void> setFutureCompleteSucessFullyRunnable = 420 new Callable<Void>() { 421 ListenableFuture<String> future = Futures.immediateFuture("setFuture"); 422 423 @Override 424 public Void call() { 425 if (currentFuture.get().setFuture(future)) { 426 numSuccessfulSetCalls.incrementAndGet(); 427 } 428 awaitUnchecked(barrier); 429 return null; 430 } 431 }; 432 Callable<Void> setFutureCompleteExceptionallyRunnable = 433 new Callable<Void>() { 434 ListenableFuture<String> future = 435 Futures.immediateFailedFuture(new Exception("setFuture")); 436 437 @Override 438 public Void call() { 439 if (currentFuture.get().setFuture(future)) { 440 numSuccessfulSetCalls.incrementAndGet(); 441 } 442 awaitUnchecked(barrier); 443 return null; 444 } 445 }; 446 Callable<Void> setFutureCancelRunnable = 447 new Callable<Void>() { 448 ListenableFuture<String> future = Futures.immediateCancelledFuture(); 449 450 @Override 451 public Void call() { 452 if (currentFuture.get().setFuture(future)) { 453 numSuccessfulSetCalls.incrementAndGet(); 454 } 455 awaitUnchecked(barrier); 456 return null; 457 } 458 }; 459 final Set<Object> finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet()); 460 Runnable collectResultsRunnable = 461 new Runnable() { 462 @Override 463 public void run() { 464 try { 465 String result = Uninterruptibles.getUninterruptibly(currentFuture.get()); 466 finalResults.add(result); 467 } catch (ExecutionException e) { 468 finalResults.add(e.getCause()); 469 } catch (CancellationException e) { 470 finalResults.add(CancellationException.class); 471 } finally { 472 awaitUnchecked(barrier); 473 } 474 } 475 }; 476 Runnable collectResultsTimedGetRunnable = 477 new Runnable() { 478 @Override 479 public void run() { 480 Future<String> future = currentFuture.get(); 481 while (true) { 482 try { 483 String result = Uninterruptibles.getUninterruptibly(future, 0, TimeUnit.SECONDS); 484 finalResults.add(result); 485 break; 486 } catch (ExecutionException e) { 487 finalResults.add(e.getCause()); 488 break; 489 } catch (CancellationException e) { 490 finalResults.add(CancellationException.class); 491 break; 492 } catch (TimeoutException e) { 493 // loop 494 } 495 } 496 awaitUnchecked(barrier); 497 } 498 }; 499 List<Callable<?>> allTasks = new ArrayList<>(); 500 allTasks.add(completeSucessFullyRunnable); 501 allTasks.add(completeExceptionallyRunnable); 502 allTasks.add(cancelRunnable); 503 allTasks.add(setFutureCompleteSucessFullyRunnable); 504 allTasks.add(setFutureCompleteExceptionallyRunnable); 505 allTasks.add(setFutureCancelRunnable); 506 for (int k = 0; k < 50; k++) { 507 // For each listener we add a task that submits it to the executor directly for the blocking 508 // get usecase and another task that adds it as a listener to the future to exercise both 509 // racing addListener calls and addListener calls completing after the future completes. 510 final Runnable listener = 511 k % 2 == 0 ? collectResultsRunnable : collectResultsTimedGetRunnable; 512 allTasks.add(Executors.callable(listener)); 513 allTasks.add( 514 new Callable<Void>() { 515 @Override 516 public Void call() throws Exception { 517 currentFuture.get().addListener(listener, executor); 518 return null; 519 } 520 }); 521 } 522 assertEquals(allTasks.size() + 1, barrier.getParties()); 523 for (int i = 0; i < 1000; i++) { 524 Collections.shuffle(allTasks); 525 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 526 currentFuture.set(future); 527 for (Callable<?> task : allTasks) { 528 @SuppressWarnings("unused") // go/futurereturn-lsc 529 Future<?> possiblyIgnoredError = executor.submit(task); 530 } 531 awaitUnchecked(barrier); 532 assertThat(future.isDone()).isTrue(); 533 // inspect state and ensure it is correct! 534 // asserts that all get calling threads received the same value 535 Object result = Iterables.getOnlyElement(finalResults); 536 if (result == CancellationException.class) { 537 assertTrue(future.isCancelled()); 538 if (future.wasInterrupted()) { 539 // We were cancelled, it is possible that setFuture could have succeeded too. 540 assertThat(numSuccessfulSetCalls.get()).isIn(Range.closed(1, 2)); 541 } else { 542 assertThat(numSuccessfulSetCalls.get()).isEqualTo(1); 543 } 544 } else { 545 assertThat(numSuccessfulSetCalls.get()).isEqualTo(1); 546 } 547 // reset for next iteration 548 numSuccessfulSetCalls.set(0); 549 finalResults.clear(); 550 } 551 executor.shutdown(); 552 } 553 554 // setFuture and cancel() interact in more complicated ways than the other setters. testSetFutureCancelBash()555 public void testSetFutureCancelBash() { 556 final int size = 50; 557 final CyclicBarrier barrier = 558 new CyclicBarrier( 559 2 // for the setter threads 560 + size // for the listeners 561 + size // for the get threads, 562 + 1); // for the main thread 563 final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties()); 564 final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference(); 565 final AtomicReference<AbstractFuture<String>> setFutureFuture = Atomics.newReference(); 566 final AtomicBoolean setFutureSetSucess = new AtomicBoolean(); 567 final AtomicBoolean setFutureCompletionSucess = new AtomicBoolean(); 568 final AtomicBoolean cancellationSucess = new AtomicBoolean(); 569 Runnable cancelRunnable = 570 new Runnable() { 571 @Override 572 public void run() { 573 cancellationSucess.set(currentFuture.get().cancel(true)); 574 awaitUnchecked(barrier); 575 } 576 }; 577 Runnable setFutureCompleteSucessFullyRunnable = 578 new Runnable() { 579 @Override 580 public void run() { 581 AbstractFuture<String> future = setFutureFuture.get(); 582 setFutureSetSucess.set(currentFuture.get().setFuture(future)); 583 setFutureCompletionSucess.set(future.set("hello-async-world")); 584 awaitUnchecked(barrier); 585 } 586 }; 587 final Set<Object> finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet()); 588 Runnable collectResultsRunnable = 589 new Runnable() { 590 @Override 591 public void run() { 592 try { 593 String result = Uninterruptibles.getUninterruptibly(currentFuture.get()); 594 finalResults.add(result); 595 } catch (ExecutionException e) { 596 finalResults.add(e.getCause()); 597 } catch (CancellationException e) { 598 finalResults.add(CancellationException.class); 599 } finally { 600 awaitUnchecked(barrier); 601 } 602 } 603 }; 604 Runnable collectResultsTimedGetRunnable = 605 new Runnable() { 606 @Override 607 public void run() { 608 Future<String> future = currentFuture.get(); 609 while (true) { 610 try { 611 String result = Uninterruptibles.getUninterruptibly(future, 0, TimeUnit.SECONDS); 612 finalResults.add(result); 613 break; 614 } catch (ExecutionException e) { 615 finalResults.add(e.getCause()); 616 break; 617 } catch (CancellationException e) { 618 finalResults.add(CancellationException.class); 619 break; 620 } catch (TimeoutException e) { 621 // loop 622 } 623 } 624 awaitUnchecked(barrier); 625 } 626 }; 627 List<Runnable> allTasks = new ArrayList<>(); 628 allTasks.add(cancelRunnable); 629 allTasks.add(setFutureCompleteSucessFullyRunnable); 630 for (int k = 0; k < size; k++) { 631 // For each listener we add a task that submits it to the executor directly for the blocking 632 // get usecase and another task that adds it as a listener to the future to exercise both 633 // racing addListener calls and addListener calls completing after the future completes. 634 final Runnable listener = 635 k % 2 == 0 ? collectResultsRunnable : collectResultsTimedGetRunnable; 636 allTasks.add(listener); 637 allTasks.add( 638 new Runnable() { 639 @Override 640 public void run() { 641 currentFuture.get().addListener(listener, executor); 642 } 643 }); 644 } 645 assertEquals(allTasks.size() + 1, barrier.getParties()); // sanity check 646 for (int i = 0; i < 1000; i++) { 647 Collections.shuffle(allTasks); 648 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 649 final AbstractFuture<String> setFuture = new AbstractFuture<String>() {}; 650 currentFuture.set(future); 651 setFutureFuture.set(setFuture); 652 for (Runnable task : allTasks) { 653 executor.execute(task); 654 } 655 awaitUnchecked(barrier); 656 assertThat(future.isDone()).isTrue(); 657 // inspect state and ensure it is correct! 658 // asserts that all get calling threads received the same value 659 Object result = Iterables.getOnlyElement(finalResults); 660 if (result == CancellationException.class) { 661 assertTrue(future.isCancelled()); 662 assertTrue(cancellationSucess.get()); 663 // cancellation can interleave in 3 ways 664 // 1. prior to setFuture 665 // 2. after setFuture before set() on the future assigned 666 // 3. after setFuture and set() are called but before the listener completes. 667 if (!setFutureSetSucess.get() || !setFutureCompletionSucess.get()) { 668 // If setFuture fails or set on the future fails then it must be because that future was 669 // cancelled 670 assertTrue(setFuture.isCancelled()); 671 assertTrue(setFuture.wasInterrupted()); // we only call cancel(true) 672 } 673 } else { 674 // set on the future completed 675 assertFalse(cancellationSucess.get()); 676 assertTrue(setFutureSetSucess.get()); 677 assertTrue(setFutureCompletionSucess.get()); 678 } 679 // reset for next iteration 680 setFutureSetSucess.set(false); 681 setFutureCompletionSucess.set(false); 682 cancellationSucess.set(false); 683 finalResults.clear(); 684 } 685 executor.shutdown(); 686 } 687 688 // Test to ensure that when calling setFuture with a done future only setFuture or cancel can 689 // return true. testSetFutureCancelBash_withDoneFuture()690 public void testSetFutureCancelBash_withDoneFuture() { 691 final CyclicBarrier barrier = 692 new CyclicBarrier( 693 2 // for the setter threads 694 + 1 // for the blocking get thread, 695 + 1); // for the main thread 696 final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties()); 697 final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference(); 698 final AtomicBoolean setFutureSuccess = new AtomicBoolean(); 699 final AtomicBoolean cancellationSucess = new AtomicBoolean(); 700 Callable<Void> cancelRunnable = 701 new Callable<Void>() { 702 @Override 703 public Void call() { 704 cancellationSucess.set(currentFuture.get().cancel(true)); 705 awaitUnchecked(barrier); 706 return null; 707 } 708 }; 709 Callable<Void> setFutureCompleteSucessFullyRunnable = 710 new Callable<Void>() { 711 final ListenableFuture<String> future = Futures.immediateFuture("hello"); 712 713 @Override 714 public Void call() { 715 setFutureSuccess.set(currentFuture.get().setFuture(future)); 716 awaitUnchecked(barrier); 717 return null; 718 } 719 }; 720 final Set<Object> finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet()); 721 final Runnable collectResultsRunnable = 722 new Runnable() { 723 @Override 724 public void run() { 725 try { 726 String result = Uninterruptibles.getUninterruptibly(currentFuture.get()); 727 finalResults.add(result); 728 } catch (ExecutionException e) { 729 finalResults.add(e.getCause()); 730 } catch (CancellationException e) { 731 finalResults.add(CancellationException.class); 732 } finally { 733 awaitUnchecked(barrier); 734 } 735 } 736 }; 737 List<Callable<?>> allTasks = new ArrayList<>(); 738 allTasks.add(cancelRunnable); 739 allTasks.add(setFutureCompleteSucessFullyRunnable); 740 allTasks.add(Executors.callable(collectResultsRunnable)); 741 assertEquals(allTasks.size() + 1, barrier.getParties()); // sanity check 742 for (int i = 0; i < 1000; i++) { 743 Collections.shuffle(allTasks); 744 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 745 currentFuture.set(future); 746 for (Callable<?> task : allTasks) { 747 @SuppressWarnings("unused") // go/futurereturn-lsc 748 Future<?> possiblyIgnoredError = executor.submit(task); 749 } 750 awaitUnchecked(barrier); 751 assertThat(future.isDone()).isTrue(); 752 // inspect state and ensure it is correct! 753 // asserts that all get calling threads received the same value 754 Object result = Iterables.getOnlyElement(finalResults); 755 if (result == CancellationException.class) { 756 assertTrue(future.isCancelled()); 757 assertTrue(cancellationSucess.get()); 758 assertFalse(setFutureSuccess.get()); 759 } else { 760 assertTrue(setFutureSuccess.get()); 761 assertFalse(cancellationSucess.get()); 762 } 763 // reset for next iteration 764 setFutureSuccess.set(false); 765 cancellationSucess.set(false); 766 finalResults.clear(); 767 } 768 executor.shutdown(); 769 } 770 771 // In a previous implementation this would cause a stack overflow after ~2000 futures chained 772 // together. Now it should only be limited by available memory (and time) testSetFuture_stackOverflow()773 public void testSetFuture_stackOverflow() { 774 SettableFuture<String> orig = SettableFuture.create(); 775 SettableFuture<String> prev = orig; 776 for (int i = 0; i < 100000; i++) { 777 SettableFuture<String> curr = SettableFuture.create(); 778 prev.setFuture(curr); 779 prev = curr; 780 } 781 // prev represents the 'innermost' future 782 prev.set("done"); 783 assertTrue(orig.isDone()); 784 } 785 testSetFuture_misbehavingFutureThrows()786 public void testSetFuture_misbehavingFutureThrows() throws Exception { 787 SettableFuture<String> future = SettableFuture.create(); 788 ListenableFuture<String> badFuture = 789 new ListenableFuture<String>() { 790 @Override 791 public boolean cancel(boolean interrupt) { 792 return false; 793 } 794 795 @Override 796 public boolean isDone() { 797 return true; 798 } 799 800 @Override 801 public boolean isCancelled() { 802 return false; // BAD!! 803 } 804 805 @Override 806 public String get() { 807 throw new CancellationException(); // BAD!! 808 } 809 810 @Override 811 public String get(long time, TimeUnit unit) { 812 throw new CancellationException(); // BAD!! 813 } 814 815 @Override 816 public void addListener(Runnable runnable, Executor executor) { 817 executor.execute(runnable); 818 } 819 }; 820 future.setFuture(badFuture); 821 ExecutionException expected = getExpectingExecutionException(future); 822 assertThat(expected).hasCauseThat().isInstanceOf(IllegalArgumentException.class); 823 assertThat(expected).hasCauseThat().hasMessageThat().contains(badFuture.toString()); 824 } 825 testSetFuture_misbehavingFutureDoesNotThrow()826 public void testSetFuture_misbehavingFutureDoesNotThrow() throws Exception { 827 SettableFuture<String> future = SettableFuture.create(); 828 ListenableFuture<String> badFuture = 829 new ListenableFuture<String>() { 830 @Override 831 public boolean cancel(boolean interrupt) { 832 return false; 833 } 834 835 @Override 836 public boolean isDone() { 837 return true; 838 } 839 840 @Override 841 public boolean isCancelled() { 842 return true; // BAD!! 843 } 844 845 @Override 846 public String get() { 847 return "foo"; // BAD!! 848 } 849 850 @Override 851 public String get(long time, TimeUnit unit) { 852 return "foo"; // BAD!! 853 } 854 855 @Override 856 public void addListener(Runnable runnable, Executor executor) { 857 executor.execute(runnable); 858 } 859 }; 860 future.setFuture(badFuture); 861 assertThat(future.isCancelled()).isTrue(); 862 } 863 testCancel_stackOverflow()864 public void testCancel_stackOverflow() { 865 SettableFuture<String> orig = SettableFuture.create(); 866 SettableFuture<String> prev = orig; 867 for (int i = 0; i < 100000; i++) { 868 SettableFuture<String> curr = SettableFuture.create(); 869 prev.setFuture(curr); 870 prev = curr; 871 } 872 // orig is the 'outermost future', this should propagate fully down the stack of futures. 873 orig.cancel(true); 874 assertTrue(orig.isCancelled()); 875 assertTrue(prev.isCancelled()); 876 assertTrue(prev.wasInterrupted()); 877 } 878 testSetFutureSelf_cancel()879 public void testSetFutureSelf_cancel() { 880 SettableFuture<String> orig = SettableFuture.create(); 881 orig.setFuture(orig); 882 orig.cancel(true); 883 assertTrue(orig.isCancelled()); 884 } 885 testSetFutureSelf_toString()886 public void testSetFutureSelf_toString() { 887 SettableFuture<String> orig = SettableFuture.create(); 888 orig.setFuture(orig); 889 assertThat(orig.toString()).contains("[status=PENDING, info=[setFuture=[this future]]]"); 890 } 891 testSetSelf_toString()892 public void testSetSelf_toString() { 893 SettableFuture<Object> orig = SettableFuture.create(); 894 orig.set(orig); 895 assertThat(orig.toString()).contains("[status=SUCCESS, result=[this future]]"); 896 } 897 testSetIndirectSelf_toString()898 public void testSetIndirectSelf_toString() { 899 final SettableFuture<Object> orig = SettableFuture.create(); 900 // unlike the above this indirection defeats the trivial cycle detection and causes a SOE 901 orig.set( 902 new Object() { 903 @Override 904 public String toString() { 905 return orig.toString(); 906 } 907 }); 908 try { 909 orig.toString(); 910 fail(); 911 } catch (StackOverflowError expected) { 912 } 913 } 914 915 // Regression test for a case where we would fail to execute listeners immediately on done futures 916 // this would be observable from an afterDone callback testListenersExecuteImmediately_fromAfterDone()917 public void testListenersExecuteImmediately_fromAfterDone() { 918 AbstractFuture<String> f = 919 new AbstractFuture<String>() { 920 @Override 921 protected void afterDone() { 922 final AtomicBoolean ranImmediately = new AtomicBoolean(); 923 addListener( 924 new Runnable() { 925 @Override 926 public void run() { 927 ranImmediately.set(true); 928 } 929 }, 930 MoreExecutors.directExecutor()); 931 assertThat(ranImmediately.get()).isTrue(); 932 } 933 }; 934 f.set("foo"); 935 } 936 937 // Regression test for a case where we would fail to execute listeners immediately on done futures 938 // this would be observable from a waiter that was just unblocked. testListenersExecuteImmediately_afterWaiterWakesUp()939 public void testListenersExecuteImmediately_afterWaiterWakesUp() throws Exception { 940 final AbstractFuture<String> f = 941 new AbstractFuture<String>() { 942 @Override 943 protected void afterDone() { 944 // this simply delays executing listeners 945 try { 946 Thread.sleep(TimeUnit.SECONDS.toMillis(10)); 947 } catch (InterruptedException ignored) { 948 Thread.currentThread().interrupt(); // preserve status 949 } 950 } 951 }; 952 Thread t = 953 new Thread() { 954 @Override 955 public void run() { 956 f.set("foo"); 957 } 958 }; 959 t.start(); 960 f.get(); 961 final AtomicBoolean ranImmediately = new AtomicBoolean(); 962 f.addListener( 963 new Runnable() { 964 @Override 965 public void run() { 966 ranImmediately.set(true); 967 } 968 }, 969 MoreExecutors.directExecutor()); 970 assertThat(ranImmediately.get()).isTrue(); 971 t.interrupt(); 972 t.join(); 973 } 974 testTrustedGetFailure_Completed()975 public void testTrustedGetFailure_Completed() { 976 SettableFuture<String> future = SettableFuture.create(); 977 future.set("261"); 978 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 979 } 980 testTrustedGetFailure_Failed()981 public void testTrustedGetFailure_Failed() { 982 SettableFuture<String> future = SettableFuture.create(); 983 Throwable failure = new Throwable(); 984 future.setException(failure); 985 assertThat(future.tryInternalFastPathGetFailure()).isEqualTo(failure); 986 } 987 testTrustedGetFailure_NotCompleted()988 public void testTrustedGetFailure_NotCompleted() { 989 SettableFuture<String> future = SettableFuture.create(); 990 assertThat(future.isDone()).isFalse(); 991 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 992 } 993 testTrustedGetFailure_CanceledNoCause()994 public void testTrustedGetFailure_CanceledNoCause() { 995 SettableFuture<String> future = SettableFuture.create(); 996 future.cancel(false); 997 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 998 } 999 testGetFailure_Completed()1000 public void testGetFailure_Completed() { 1001 AbstractFuture<String> future = new AbstractFuture<String>() {}; 1002 future.set("261"); 1003 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1004 } 1005 testGetFailure_Failed()1006 public void testGetFailure_Failed() { 1007 AbstractFuture<String> future = new AbstractFuture<String>() {}; 1008 final Throwable failure = new Throwable(); 1009 future.setException(failure); 1010 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1011 } 1012 testGetFailure_NotCompleted()1013 public void testGetFailure_NotCompleted() { 1014 AbstractFuture<String> future = new AbstractFuture<String>() {}; 1015 assertThat(future.isDone()).isFalse(); 1016 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1017 } 1018 testGetFailure_CanceledNoCause()1019 public void testGetFailure_CanceledNoCause() { 1020 AbstractFuture<String> future = new AbstractFuture<String>() {}; 1021 future.cancel(false); 1022 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1023 } 1024 testForwardExceptionFastPath()1025 public void testForwardExceptionFastPath() throws Exception { 1026 class FailFuture extends InternalFutureFailureAccess implements ListenableFuture<String> { 1027 Throwable failure; 1028 1029 FailFuture(Throwable throwable) { 1030 failure = throwable; 1031 } 1032 1033 @Override 1034 public boolean cancel(boolean mayInterruptIfRunning) { 1035 throw new AssertionFailedError("cancel shouldn't be called on this object"); 1036 } 1037 1038 @Override 1039 public boolean isCancelled() { 1040 return false; 1041 } 1042 1043 @Override 1044 public boolean isDone() { 1045 return true; 1046 } 1047 1048 @Override 1049 public String get() throws InterruptedException, ExecutionException { 1050 throw new AssertionFailedError("get() shouldn't be called on this object"); 1051 } 1052 1053 @Override 1054 public String get(long timeout, TimeUnit unit) 1055 throws InterruptedException, ExecutionException, TimeoutException { 1056 return get(); 1057 } 1058 1059 @Override 1060 protected Throwable tryInternalFastPathGetFailure() { 1061 return failure; 1062 } 1063 1064 @Override 1065 public void addListener(Runnable listener, Executor executor) { 1066 throw new AssertionFailedError("addListener() shouldn't be called on this object"); 1067 } 1068 } 1069 1070 final RuntimeException exception = new RuntimeException("you still didn't say the magic word!"); 1071 SettableFuture<String> normalFuture = SettableFuture.create(); 1072 normalFuture.setFuture(new FailFuture(exception)); 1073 assertTrue(normalFuture.isDone()); 1074 try { 1075 normalFuture.get(); 1076 fail(); 1077 } catch (ExecutionException e) { 1078 assertSame(exception, e.getCause()); 1079 } 1080 } 1081 awaitUnchecked(final CyclicBarrier barrier)1082 private static void awaitUnchecked(final CyclicBarrier barrier) { 1083 try { 1084 barrier.await(); 1085 } catch (Exception e) { 1086 throw new RuntimeException(e); 1087 } 1088 } 1089 checkStackTrace(ExecutionException e)1090 private void checkStackTrace(ExecutionException e) { 1091 // Our call site for get() should be in the trace. 1092 int index = findStackFrame(e, getClass().getName(), "getExpectingExecutionException"); 1093 1094 assertThat(index).isNotEqualTo(0); 1095 1096 // Above our method should be the call to get(). Don't assert on the class 1097 // because it could be some superclass. 1098 assertThat(e.getStackTrace()[index - 1].getMethodName()).isEqualTo("get"); 1099 } 1100 findStackFrame(ExecutionException e, String clazz, String method)1101 private static int findStackFrame(ExecutionException e, String clazz, String method) { 1102 StackTraceElement[] elements = e.getStackTrace(); 1103 for (int i = 0; i < elements.length; i++) { 1104 StackTraceElement element = elements[i]; 1105 if (element.getClassName().equals(clazz) && element.getMethodName().equals(method)) { 1106 return i; 1107 } 1108 } 1109 AssertionFailedError failure = 1110 new AssertionFailedError( 1111 "Expected element " + clazz + "." + method + " not found in stack trace"); 1112 failure.initCause(e); 1113 throw failure; 1114 } 1115 getExpectingExecutionException(AbstractFuture<String> future)1116 private ExecutionException getExpectingExecutionException(AbstractFuture<String> future) 1117 throws InterruptedException { 1118 try { 1119 String got = future.get(); 1120 fail("Expected exception but got " + got); 1121 } catch (ExecutionException e) { 1122 return e; 1123 } 1124 1125 // unreachable, but compiler doesn't know that fail() always throws 1126 return null; 1127 } 1128 1129 private static final class WaiterThread extends Thread { 1130 private final AbstractFuture<?> future; 1131 WaiterThread(AbstractFuture<?> future)1132 private WaiterThread(AbstractFuture<?> future) { 1133 this.future = future; 1134 } 1135 1136 @Override run()1137 public void run() { 1138 try { 1139 future.get(); 1140 } catch (Exception e) { 1141 // nothing 1142 } 1143 } 1144 awaitWaiting()1145 void awaitWaiting() { 1146 while (!isBlocked()) { 1147 if (getState() == State.TERMINATED) { 1148 throw new RuntimeException("Thread exited"); 1149 } 1150 Thread.yield(); 1151 } 1152 } 1153 isBlocked()1154 private boolean isBlocked() { 1155 return getState() == Thread.State.WAITING && LockSupport.getBlocker(this) == future; 1156 } 1157 } 1158 1159 static final class TimedWaiterThread extends Thread { 1160 private final AbstractFuture<?> future; 1161 private final long timeout; 1162 private final TimeUnit unit; 1163 private Exception exception; 1164 private volatile long startTime; 1165 private long timeSpentBlocked; 1166 TimedWaiterThread(AbstractFuture<?> future, long timeout, TimeUnit unit)1167 TimedWaiterThread(AbstractFuture<?> future, long timeout, TimeUnit unit) { 1168 this.future = future; 1169 this.timeout = timeout; 1170 this.unit = unit; 1171 } 1172 1173 @Override run()1174 public void run() { 1175 startTime = System.nanoTime(); 1176 try { 1177 future.get(timeout, unit); 1178 } catch (Exception e) { 1179 // nothing 1180 exception = e; 1181 } finally { 1182 timeSpentBlocked = System.nanoTime() - startTime; 1183 } 1184 } 1185 awaitWaiting()1186 void awaitWaiting() { 1187 while (!isBlocked()) { 1188 if (getState() == State.TERMINATED) { 1189 throw new RuntimeException("Thread exited"); 1190 } 1191 Thread.yield(); 1192 } 1193 } 1194 isBlocked()1195 private boolean isBlocked() { 1196 return getState() == Thread.State.TIMED_WAITING && LockSupport.getBlocker(this) == future; 1197 } 1198 } 1199 1200 private static final class PollingThread extends Thread { 1201 private final AbstractFuture<?> future; 1202 private final CountDownLatch completedIteration = new CountDownLatch(10); 1203 PollingThread(AbstractFuture<?> future)1204 private PollingThread(AbstractFuture<?> future) { 1205 this.future = future; 1206 } 1207 1208 @Override run()1209 public void run() { 1210 while (true) { 1211 try { 1212 future.get(0, TimeUnit.SECONDS); 1213 return; 1214 } catch (InterruptedException | ExecutionException e) { 1215 return; 1216 } catch (TimeoutException e) { 1217 // do nothing 1218 } finally { 1219 completedIteration.countDown(); 1220 } 1221 } 1222 } 1223 awaitInLoop()1224 void awaitInLoop() { 1225 Uninterruptibles.awaitUninterruptibly(completedIteration); 1226 } 1227 } 1228 1229 private static final class InterruptibleFuture extends AbstractFuture<String> { 1230 boolean interruptTaskWasCalled; 1231 1232 @Override interruptTask()1233 protected void interruptTask() { 1234 assertFalse(interruptTaskWasCalled); 1235 interruptTaskWasCalled = true; 1236 } 1237 } 1238 } 1239