1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.base.StandardSystemProperty.JAVA_SPECIFICATION_VERSION; 20 import static com.google.common.base.StandardSystemProperty.OS_NAME; 21 import static com.google.common.truth.Truth.assertThat; 22 import static com.google.common.truth.Truth.assertWithMessage; 23 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 24 import static org.junit.Assert.assertThrows; 25 26 import com.google.common.annotations.GwtIncompatible; 27 import com.google.common.collect.Iterables; 28 import com.google.common.collect.Range; 29 import com.google.common.collect.Sets; 30 import com.google.common.primitives.Ints; 31 import com.google.common.util.concurrent.internal.InternalFutureFailureAccess; 32 import java.util.ArrayList; 33 import java.util.Arrays; 34 import java.util.Collections; 35 import java.util.List; 36 import java.util.Set; 37 import java.util.concurrent.Callable; 38 import java.util.concurrent.CancellationException; 39 import java.util.concurrent.CountDownLatch; 40 import java.util.concurrent.CyclicBarrier; 41 import java.util.concurrent.ExecutionException; 42 import java.util.concurrent.Executor; 43 import java.util.concurrent.ExecutorService; 44 import java.util.concurrent.Executors; 45 import java.util.concurrent.Future; 46 import java.util.concurrent.TimeUnit; 47 import java.util.concurrent.TimeoutException; 48 import java.util.concurrent.atomic.AtomicBoolean; 49 import java.util.concurrent.atomic.AtomicInteger; 50 import java.util.concurrent.atomic.AtomicReference; 51 import java.util.concurrent.locks.LockSupport; 52 import junit.framework.AssertionFailedError; 53 import junit.framework.TestCase; 54 import org.checkerframework.checker.nullness.qual.Nullable; 55 56 /** 57 * Tests for {@link AbstractFuture}. 58 * 59 * @author Brian Stoler 60 */ 61 public class AbstractFutureTest extends TestCase { testSuccess()62 public void testSuccess() throws ExecutionException, InterruptedException { 63 final Object value = new Object(); 64 assertSame( 65 value, 66 new AbstractFuture<Object>() { 67 { 68 set(value); 69 } 70 }.get()); 71 } 72 testException()73 public void testException() throws InterruptedException { 74 final Throwable failure = new Throwable(); 75 AbstractFuture<String> future = 76 new AbstractFuture<String>() { 77 { 78 setException(failure); 79 } 80 }; 81 82 ExecutionException ee1 = getExpectingExecutionException(future); 83 ExecutionException ee2 = getExpectingExecutionException(future); 84 85 // Ensure we get a unique execution exception on each get 86 assertNotSame(ee1, ee2); 87 88 assertThat(ee1).hasCauseThat().isSameInstanceAs(failure); 89 assertThat(ee2).hasCauseThat().isSameInstanceAs(failure); 90 91 checkStackTrace(ee1); 92 checkStackTrace(ee2); 93 } 94 testCancel_notDoneNoInterrupt()95 public void testCancel_notDoneNoInterrupt() throws Exception { 96 InterruptibleFuture future = new InterruptibleFuture(); 97 assertTrue(future.cancel(false)); 98 assertTrue(future.isCancelled()); 99 assertTrue(future.isDone()); 100 assertFalse(future.wasInterrupted()); 101 assertFalse(future.interruptTaskWasCalled); 102 CancellationException e = assertThrows(CancellationException.class, () -> future.get()); 103 assertThat(e).hasCauseThat().isNull(); 104 } 105 testCancel_notDoneInterrupt()106 public void testCancel_notDoneInterrupt() throws Exception { 107 InterruptibleFuture future = new InterruptibleFuture(); 108 assertTrue(future.cancel(true)); 109 assertTrue(future.isCancelled()); 110 assertTrue(future.isDone()); 111 assertTrue(future.wasInterrupted()); 112 assertTrue(future.interruptTaskWasCalled); 113 CancellationException e = assertThrows(CancellationException.class, () -> future.get()); 114 assertThat(e).hasCauseThat().isNull(); 115 } 116 testCancel_done()117 public void testCancel_done() throws Exception { 118 AbstractFuture<String> future = 119 new AbstractFuture<String>() { 120 { 121 set("foo"); 122 } 123 }; 124 assertFalse(future.cancel(true)); 125 assertFalse(future.isCancelled()); 126 assertTrue(future.isDone()); 127 } 128 testGetWithTimeoutDoneFuture()129 public void testGetWithTimeoutDoneFuture() throws Exception { 130 AbstractFuture<String> future = 131 new AbstractFuture<String>() { 132 { 133 set("foo"); 134 } 135 }; 136 assertEquals("foo", future.get(0, TimeUnit.SECONDS)); 137 } 138 testEvilFuture_setFuture()139 public void testEvilFuture_setFuture() throws Exception { 140 final RuntimeException exception = new RuntimeException("you didn't say the magic word!"); 141 AbstractFuture<String> evilFuture = 142 new AbstractFuture<String>() { 143 @Override 144 public void addListener(Runnable r, Executor e) { 145 throw exception; 146 } 147 }; 148 AbstractFuture<String> normalFuture = new AbstractFuture<String>() {}; 149 normalFuture.setFuture(evilFuture); 150 assertTrue(normalFuture.isDone()); 151 ExecutionException e = assertThrows(ExecutionException.class, () -> normalFuture.get()); 152 assertThat(e).hasCauseThat().isSameInstanceAs(exception); 153 } 154 testRemoveWaiter_interruption()155 public void testRemoveWaiter_interruption() throws Exception { 156 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 157 WaiterThread waiter1 = new WaiterThread(future); 158 waiter1.start(); 159 waiter1.awaitWaiting(); 160 161 WaiterThread waiter2 = new WaiterThread(future); 162 waiter2.start(); 163 waiter2.awaitWaiting(); 164 // The waiter queue should be waiter2->waiter1 165 166 // This should wake up waiter1 and cause the waiter1 node to be removed. 167 waiter1.interrupt(); 168 169 waiter1.join(); 170 waiter2.awaitWaiting(); // should still be blocked 171 172 LockSupport.unpark(waiter2); // spurious wakeup 173 waiter2.awaitWaiting(); // should eventually re-park 174 175 future.set(null); 176 waiter2.join(); 177 } 178 testRemoveWaiter_polling()179 public void testRemoveWaiter_polling() throws Exception { 180 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 181 WaiterThread waiter = new WaiterThread(future); 182 waiter.start(); 183 waiter.awaitWaiting(); 184 PollingThread poller = new PollingThread(future); 185 poller.start(); 186 PollingThread poller2 = new PollingThread(future); 187 poller2.start(); 188 PollingThread poller3 = new PollingThread(future); 189 poller3.start(); 190 poller.awaitInLoop(); 191 poller2.awaitInLoop(); 192 poller3.awaitInLoop(); 193 194 // The waiter queue should be {poller x 3}->waiter1 195 waiter.interrupt(); 196 197 // This should wake up waiter1 and cause the waiter1 node to be removed. 198 waiter.join(); 199 future.set(null); 200 poller.join(); 201 } 202 testToString_allUnique()203 public void testToString_allUnique() throws Exception { 204 // Two futures should not have the same toString, to avoid people asserting on it 205 assertThat(SettableFuture.create().toString()).isNotEqualTo(SettableFuture.create().toString()); 206 } 207 testToString_oom()208 public void testToString_oom() throws Exception { 209 SettableFuture<Object> future = SettableFuture.create(); 210 future.set( 211 new Object() { 212 @Override 213 public String toString() { 214 throw new OutOfMemoryError(); 215 } 216 217 @Override 218 public int hashCode() { 219 throw new OutOfMemoryError(); 220 } 221 }); 222 223 String unused = future.toString(); 224 225 SettableFuture<Object> future2 = SettableFuture.create(); 226 227 // A more organic OOM from a toString implementation 228 Object object = 229 new Object() { 230 @Override 231 public String toString() { 232 return new String(new char[50_000]); 233 } 234 }; 235 List<Object> list = Collections.singletonList(object); 236 for (int i = 0; i < 10; i++) { 237 Object[] array = new Object[500]; 238 Arrays.fill(array, list); 239 list = Arrays.asList(array); 240 } 241 future2.set(list); 242 243 unused = future.toString(); 244 } 245 testToString_notDone()246 public void testToString_notDone() throws Exception { 247 AbstractFuture<Object> testFuture = 248 new AbstractFuture<Object>() { 249 @Override 250 public String pendingToString() { 251 return "cause=[Because this test isn't done]"; 252 } 253 }; 254 assertThat(testFuture.toString()) 255 .matches( 256 "[^\\[]+\\[status=PENDING, info=\\[cause=\\[Because this test isn't done\\]\\]\\]"); 257 TimeoutException e = 258 assertThrows(TimeoutException.class, () -> testFuture.get(1, TimeUnit.NANOSECONDS)); 259 assertThat(e.getMessage()).contains("1 nanoseconds"); 260 assertThat(e.getMessage()).contains("Because this test isn't done"); 261 } 262 testToString_completesDuringToString()263 public void testToString_completesDuringToString() throws Exception { 264 AbstractFuture<Object> testFuture = 265 new AbstractFuture<Object>() { 266 @Override 267 public String pendingToString() { 268 // Complete ourselves during the toString calculation 269 this.set(true); 270 return "cause=[Because this test isn't done]"; 271 } 272 }; 273 assertThat(testFuture.toString()) 274 .matches("[^\\[]+\\[status=SUCCESS, result=\\[java.lang.Boolean@\\w+\\]\\]"); 275 } 276 277 /** 278 * This test attempts to cause a future to wait for longer than it was requested to from a timed 279 * get() call. As measurements of time are prone to flakiness, it tries to assert based on ranges 280 * derived from observing how much time actually passed for various operations. 281 */ 282 @SuppressWarnings({"DeprecatedThreadMethods", "ThreadPriorityCheck"}) 283 @AndroidIncompatible // Thread.suspend testToString_delayedTimeout()284 public void testToString_delayedTimeout() throws Exception { 285 Integer javaVersion = Ints.tryParse(JAVA_SPECIFICATION_VERSION.value()); 286 // Parsing to an integer might fail because Java 8 returns "1.8" instead of "8." 287 // We can continue if it's 1.8, and we can continue if it's an integer in [9, 20). 288 if (javaVersion != null && javaVersion >= 20) { 289 // TODO(b/261217224): Make this test work under newer JDKs. 290 return; 291 } 292 TimedWaiterThread thread = 293 new TimedWaiterThread(new AbstractFuture<Object>() {}, 2, TimeUnit.SECONDS); 294 thread.start(); 295 thread.awaitWaiting(); 296 thread.suspend(); 297 // Sleep for enough time to add 1500 milliseconds of overwait to the get() call. 298 long toWaitMillis = 3500 - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - thread.startTime); 299 Thread.sleep(toWaitMillis); 300 thread.setPriority(Thread.MAX_PRIORITY); 301 thread.resume(); 302 thread.join(); 303 // It's possible to race and suspend the thread just before the park call actually takes effect, 304 // causing the thread to be suspended for 3.5 seconds, and then park itself for 2 seconds after 305 // being resumed. To avoid a flake in this scenario, calculate how long that thread actually 306 // waited and assert based on that time. Empirically, the race where the thread ends up waiting 307 // for 5.5 seconds happens about 2% of the time. 308 boolean longWait = TimeUnit.NANOSECONDS.toSeconds(thread.timeSpentBlocked) >= 5; 309 // Count how long it actually took to return; we'll accept any number between the expected delay 310 // and the approximate actual delay, to be robust to variance in thread scheduling. 311 char overWaitNanosFirstDigit = 312 Long.toString( 313 thread.timeSpentBlocked - TimeUnit.MILLISECONDS.toNanos(longWait ? 5000 : 3000)) 314 .charAt(0); 315 if (overWaitNanosFirstDigit < '4') { 316 overWaitNanosFirstDigit = '9'; 317 } 318 String nanosRegex = "[4-" + overWaitNanosFirstDigit + "][0-9]+"; 319 assertWithMessage( 320 "Spent " + thread.timeSpentBlocked + " ns blocked; slept for " + toWaitMillis + " ms") 321 .that(thread.exception) 322 .hasMessageThat() 323 .matches( 324 "Waited 2 seconds \\(plus " 325 + (longWait ? "3" : "1") 326 + " seconds, " 327 + nanosRegex 328 + " nanoseconds delay\\).*"); 329 } 330 testToString_completed()331 public void testToString_completed() throws Exception { 332 AbstractFuture<Object> testFuture2 = 333 new AbstractFuture<Object>() { 334 @Override 335 public String pendingToString() { 336 return "cause=[Someday...]"; 337 } 338 }; 339 AbstractFuture<Object> testFuture3 = new AbstractFuture<Object>() {}; 340 testFuture3.setFuture(testFuture2); 341 assertThat(testFuture3.toString()) 342 .matches( 343 "[^\\[]+\\[status=PENDING, setFuture=\\[[^\\[]+\\[status=PENDING," 344 + " info=\\[cause=\\[Someday...]]]]]"); 345 testFuture2.set("result string"); 346 assertThat(testFuture3.toString()) 347 .matches("[^\\[]+\\[status=SUCCESS, result=\\[java.lang.String@\\w+\\]\\]"); 348 } 349 testToString_cancelled()350 public void testToString_cancelled() throws Exception { 351 assertThat(Futures.immediateCancelledFuture().toString()) 352 .matches("[^\\[]+\\[status=CANCELLED\\]"); 353 } 354 testToString_failed()355 public void testToString_failed() { 356 assertThat(Futures.immediateFailedFuture(new RuntimeException("foo")).toString()) 357 .matches("[^\\[]+\\[status=FAILURE, cause=\\[java.lang.RuntimeException: foo\\]\\]"); 358 } 359 testToString_misbehaving()360 public void testToString_misbehaving() throws Exception { 361 assertThat( 362 new AbstractFuture<Object>() { 363 @Override 364 public String pendingToString() { 365 throw new RuntimeException("I'm a misbehaving implementation"); 366 } 367 }.toString()) 368 .matches( 369 "[^\\[]+\\[status=PENDING, info=\\[Exception thrown from implementation: " 370 + "class java.lang.RuntimeException\\]\\]"); 371 } 372 testCompletionFinishesWithDone()373 public void testCompletionFinishesWithDone() { 374 ExecutorService executor = Executors.newFixedThreadPool(10); 375 for (int i = 0; i < 50000; i++) { 376 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 377 final AtomicReference<String> errorMessage = Atomics.newReference(); 378 executor.execute( 379 new Runnable() { 380 @Override 381 public void run() { 382 future.set("success"); 383 if (!future.isDone()) { 384 errorMessage.set("Set call exited before future was complete."); 385 } 386 } 387 }); 388 executor.execute( 389 new Runnable() { 390 @Override 391 public void run() { 392 future.setException(new IllegalArgumentException("failure")); 393 if (!future.isDone()) { 394 errorMessage.set("SetException call exited before future was complete."); 395 } 396 } 397 }); 398 executor.execute( 399 new Runnable() { 400 @Override 401 public void run() { 402 future.cancel(true); 403 if (!future.isDone()) { 404 errorMessage.set("Cancel call exited before future was complete."); 405 } 406 } 407 }); 408 try { 409 future.get(); 410 } catch (Throwable t) { 411 // Ignore, we just wanted to block. 412 } 413 String error = errorMessage.get(); 414 assertNull(error, error); 415 } 416 executor.shutdown(); 417 } 418 419 /** 420 * He did the bash, he did the future bash The future bash, it was a concurrency smash He did the 421 * bash, it caught on in a flash He did the bash, he did the future bash 422 */ 423 testFutureBash()424 public void testFutureBash() { 425 if (isWindows()) { 426 return; // TODO: b/136041958 - Running very slowly on Windows CI. 427 } 428 final CyclicBarrier barrier = 429 new CyclicBarrier( 430 6 // for the setter threads 431 + 50 // for the listeners 432 + 50 // for the blocking get threads, 433 + 1); // for the main thread 434 final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties()); 435 final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference(); 436 final AtomicInteger numSuccessfulSetCalls = new AtomicInteger(); 437 Callable<@Nullable Void> completeSuccessfullyRunnable = 438 new Callable<@Nullable Void>() { 439 @Override 440 public @Nullable Void call() { 441 if (currentFuture.get().set("set")) { 442 numSuccessfulSetCalls.incrementAndGet(); 443 } 444 awaitUnchecked(barrier); 445 return null; 446 } 447 }; 448 Callable<@Nullable Void> completeExceptionallyRunnable = 449 new Callable<@Nullable Void>() { 450 Exception failureCause = new Exception("setException"); 451 452 @Override 453 public @Nullable Void call() { 454 if (currentFuture.get().setException(failureCause)) { 455 numSuccessfulSetCalls.incrementAndGet(); 456 } 457 awaitUnchecked(barrier); 458 return null; 459 } 460 }; 461 Callable<@Nullable Void> cancelRunnable = 462 new Callable<@Nullable Void>() { 463 @Override 464 public @Nullable Void call() { 465 if (currentFuture.get().cancel(true)) { 466 numSuccessfulSetCalls.incrementAndGet(); 467 } 468 awaitUnchecked(barrier); 469 return null; 470 } 471 }; 472 Callable<@Nullable Void> setFutureCompleteSuccessfullyRunnable = 473 new Callable<@Nullable Void>() { 474 ListenableFuture<String> future = Futures.immediateFuture("setFuture"); 475 476 @Override 477 public @Nullable Void call() { 478 if (currentFuture.get().setFuture(future)) { 479 numSuccessfulSetCalls.incrementAndGet(); 480 } 481 awaitUnchecked(barrier); 482 return null; 483 } 484 }; 485 Callable<@Nullable Void> setFutureCompleteExceptionallyRunnable = 486 new Callable<@Nullable Void>() { 487 ListenableFuture<String> future = 488 Futures.immediateFailedFuture(new Exception("setFuture")); 489 490 @Override 491 public @Nullable Void call() { 492 if (currentFuture.get().setFuture(future)) { 493 numSuccessfulSetCalls.incrementAndGet(); 494 } 495 awaitUnchecked(barrier); 496 return null; 497 } 498 }; 499 Callable<@Nullable Void> setFutureCancelRunnable = 500 new Callable<@Nullable Void>() { 501 ListenableFuture<String> future = Futures.immediateCancelledFuture(); 502 503 @Override 504 public @Nullable Void call() { 505 if (currentFuture.get().setFuture(future)) { 506 numSuccessfulSetCalls.incrementAndGet(); 507 } 508 awaitUnchecked(barrier); 509 return null; 510 } 511 }; 512 final Set<Object> finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet()); 513 Runnable collectResultsRunnable = 514 new Runnable() { 515 @Override 516 public void run() { 517 try { 518 String result = Uninterruptibles.getUninterruptibly(currentFuture.get()); 519 finalResults.add(result); 520 } catch (ExecutionException e) { 521 finalResults.add(e.getCause()); 522 } catch (CancellationException e) { 523 finalResults.add(CancellationException.class); 524 } finally { 525 awaitUnchecked(barrier); 526 } 527 } 528 }; 529 Runnable collectResultsTimedGetRunnable = 530 new Runnable() { 531 @Override 532 public void run() { 533 Future<String> future = currentFuture.get(); 534 while (true) { 535 try { 536 String result = Uninterruptibles.getUninterruptibly(future, 0, TimeUnit.SECONDS); 537 finalResults.add(result); 538 break; 539 } catch (ExecutionException e) { 540 finalResults.add(e.getCause()); 541 break; 542 } catch (CancellationException e) { 543 finalResults.add(CancellationException.class); 544 break; 545 } catch (TimeoutException e) { 546 // loop 547 } 548 } 549 awaitUnchecked(barrier); 550 } 551 }; 552 List<Callable<?>> allTasks = new ArrayList<>(); 553 allTasks.add(completeSuccessfullyRunnable); 554 allTasks.add(completeExceptionallyRunnable); 555 allTasks.add(cancelRunnable); 556 allTasks.add(setFutureCompleteSuccessfullyRunnable); 557 allTasks.add(setFutureCompleteExceptionallyRunnable); 558 allTasks.add(setFutureCancelRunnable); 559 for (int k = 0; k < 50; k++) { 560 // For each listener we add a task that submits it to the executor directly for the blocking 561 // get use case and another task that adds it as a listener to the future to exercise both 562 // racing addListener calls and addListener calls completing after the future completes. 563 final Runnable listener = 564 k % 2 == 0 ? collectResultsRunnable : collectResultsTimedGetRunnable; 565 allTasks.add(Executors.callable(listener)); 566 allTasks.add( 567 new Callable<@Nullable Void>() { 568 @Override 569 public @Nullable Void call() throws Exception { 570 currentFuture.get().addListener(listener, executor); 571 return null; 572 } 573 }); 574 } 575 assertEquals(allTasks.size() + 1, barrier.getParties()); 576 for (int i = 0; i < 1000; i++) { 577 Collections.shuffle(allTasks); 578 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 579 currentFuture.set(future); 580 for (Callable<?> task : allTasks) { 581 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 582 Future<?> possiblyIgnoredError = executor.submit(task); 583 } 584 awaitUnchecked(barrier); 585 assertThat(future.isDone()).isTrue(); 586 // inspect state and ensure it is correct! 587 // asserts that all get calling threads received the same value 588 Object result = Iterables.getOnlyElement(finalResults); 589 if (result == CancellationException.class) { 590 assertTrue(future.isCancelled()); 591 if (future.wasInterrupted()) { 592 // We were cancelled, it is possible that setFuture could have succeeded too. 593 assertThat(numSuccessfulSetCalls.get()).isIn(Range.closed(1, 2)); 594 } else { 595 assertThat(numSuccessfulSetCalls.get()).isEqualTo(1); 596 } 597 } else { 598 assertThat(numSuccessfulSetCalls.get()).isEqualTo(1); 599 } 600 // reset for next iteration 601 numSuccessfulSetCalls.set(0); 602 finalResults.clear(); 603 } 604 executor.shutdown(); 605 } 606 607 // setFuture and cancel() interact in more complicated ways than the other setters. testSetFutureCancelBash()608 public void testSetFutureCancelBash() { 609 if (isWindows()) { 610 return; // TODO: b/136041958 - Running very slowly on Windows CI. 611 } 612 final int size = 50; 613 final CyclicBarrier barrier = 614 new CyclicBarrier( 615 2 // for the setter threads 616 + size // for the listeners 617 + size // for the get threads, 618 + 1); // for the main thread 619 final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties()); 620 final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference(); 621 final AtomicReference<AbstractFuture<String>> setFutureFuture = Atomics.newReference(); 622 final AtomicBoolean setFutureSetSuccess = new AtomicBoolean(); 623 final AtomicBoolean setFutureCompletionSuccess = new AtomicBoolean(); 624 final AtomicBoolean cancellationSuccess = new AtomicBoolean(); 625 Runnable cancelRunnable = 626 new Runnable() { 627 @Override 628 public void run() { 629 cancellationSuccess.set(currentFuture.get().cancel(true)); 630 awaitUnchecked(barrier); 631 } 632 }; 633 Runnable setFutureCompleteSuccessfullyRunnable = 634 new Runnable() { 635 @Override 636 public void run() { 637 AbstractFuture<String> future = setFutureFuture.get(); 638 setFutureSetSuccess.set(currentFuture.get().setFuture(future)); 639 setFutureCompletionSuccess.set(future.set("hello-async-world")); 640 awaitUnchecked(barrier); 641 } 642 }; 643 final Set<Object> finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet()); 644 Runnable collectResultsRunnable = 645 new Runnable() { 646 @Override 647 public void run() { 648 try { 649 String result = Uninterruptibles.getUninterruptibly(currentFuture.get()); 650 finalResults.add(result); 651 } catch (ExecutionException e) { 652 finalResults.add(e.getCause()); 653 } catch (CancellationException e) { 654 finalResults.add(CancellationException.class); 655 } finally { 656 awaitUnchecked(barrier); 657 } 658 } 659 }; 660 Runnable collectResultsTimedGetRunnable = 661 new Runnable() { 662 @Override 663 public void run() { 664 Future<String> future = currentFuture.get(); 665 while (true) { 666 try { 667 String result = Uninterruptibles.getUninterruptibly(future, 0, TimeUnit.SECONDS); 668 finalResults.add(result); 669 break; 670 } catch (ExecutionException e) { 671 finalResults.add(e.getCause()); 672 break; 673 } catch (CancellationException e) { 674 finalResults.add(CancellationException.class); 675 break; 676 } catch (TimeoutException e) { 677 // loop 678 } 679 } 680 awaitUnchecked(barrier); 681 } 682 }; 683 List<Runnable> allTasks = new ArrayList<>(); 684 allTasks.add(cancelRunnable); 685 allTasks.add(setFutureCompleteSuccessfullyRunnable); 686 for (int k = 0; k < size; k++) { 687 // For each listener we add a task that submits it to the executor directly for the blocking 688 // get use case and another task that adds it as a listener to the future to exercise both 689 // racing addListener calls and addListener calls completing after the future completes. 690 final Runnable listener = 691 k % 2 == 0 ? collectResultsRunnable : collectResultsTimedGetRunnable; 692 allTasks.add(listener); 693 allTasks.add( 694 new Runnable() { 695 @Override 696 public void run() { 697 currentFuture.get().addListener(listener, executor); 698 } 699 }); 700 } 701 assertEquals(allTasks.size() + 1, barrier.getParties()); // sanity check 702 for (int i = 0; i < 1000; i++) { 703 Collections.shuffle(allTasks); 704 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 705 final AbstractFuture<String> setFuture = new AbstractFuture<String>() {}; 706 currentFuture.set(future); 707 setFutureFuture.set(setFuture); 708 for (Runnable task : allTasks) { 709 executor.execute(task); 710 } 711 awaitUnchecked(barrier); 712 assertThat(future.isDone()).isTrue(); 713 // inspect state and ensure it is correct! 714 // asserts that all get calling threads received the same value 715 Object result = Iterables.getOnlyElement(finalResults); 716 if (result == CancellationException.class) { 717 assertTrue(future.isCancelled()); 718 assertTrue(cancellationSuccess.get()); 719 // cancellation can interleave in 3 ways 720 // 1. prior to setFuture 721 // 2. after setFuture before set() on the future assigned 722 // 3. after setFuture and set() are called but before the listener completes. 723 if (!setFutureSetSuccess.get() || !setFutureCompletionSuccess.get()) { 724 // If setFuture fails or set on the future fails then it must be because that future was 725 // cancelled 726 assertTrue(setFuture.isCancelled()); 727 assertTrue(setFuture.wasInterrupted()); // we only call cancel(true) 728 } 729 } else { 730 // set on the future completed 731 assertFalse(cancellationSuccess.get()); 732 assertTrue(setFutureSetSuccess.get()); 733 assertTrue(setFutureCompletionSuccess.get()); 734 } 735 // reset for next iteration 736 setFutureSetSuccess.set(false); 737 setFutureCompletionSuccess.set(false); 738 cancellationSuccess.set(false); 739 finalResults.clear(); 740 } 741 executor.shutdown(); 742 } 743 744 // Test to ensure that when calling setFuture with a done future only setFuture or cancel can 745 // return true. testSetFutureCancelBash_withDoneFuture()746 public void testSetFutureCancelBash_withDoneFuture() { 747 final CyclicBarrier barrier = 748 new CyclicBarrier( 749 2 // for the setter threads 750 + 1 // for the blocking get thread, 751 + 1); // for the main thread 752 final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties()); 753 final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference(); 754 final AtomicBoolean setFutureSuccess = new AtomicBoolean(); 755 final AtomicBoolean cancellationSuccess = new AtomicBoolean(); 756 Callable<@Nullable Void> cancelRunnable = 757 new Callable<@Nullable Void>() { 758 @Override 759 public @Nullable Void call() { 760 cancellationSuccess.set(currentFuture.get().cancel(true)); 761 awaitUnchecked(barrier); 762 return null; 763 } 764 }; 765 Callable<@Nullable Void> setFutureCompleteSuccessfullyRunnable = 766 new Callable<@Nullable Void>() { 767 final ListenableFuture<String> future = Futures.immediateFuture("hello"); 768 769 @Override 770 public @Nullable Void call() { 771 setFutureSuccess.set(currentFuture.get().setFuture(future)); 772 awaitUnchecked(barrier); 773 return null; 774 } 775 }; 776 final Set<Object> finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet()); 777 final Runnable collectResultsRunnable = 778 new Runnable() { 779 @Override 780 public void run() { 781 try { 782 String result = Uninterruptibles.getUninterruptibly(currentFuture.get()); 783 finalResults.add(result); 784 } catch (ExecutionException e) { 785 finalResults.add(e.getCause()); 786 } catch (CancellationException e) { 787 finalResults.add(CancellationException.class); 788 } finally { 789 awaitUnchecked(barrier); 790 } 791 } 792 }; 793 List<Callable<?>> allTasks = new ArrayList<>(); 794 allTasks.add(cancelRunnable); 795 allTasks.add(setFutureCompleteSuccessfullyRunnable); 796 allTasks.add(Executors.callable(collectResultsRunnable)); 797 assertEquals(allTasks.size() + 1, barrier.getParties()); // sanity check 798 for (int i = 0; i < 1000; i++) { 799 Collections.shuffle(allTasks); 800 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 801 currentFuture.set(future); 802 for (Callable<?> task : allTasks) { 803 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 804 Future<?> possiblyIgnoredError = executor.submit(task); 805 } 806 awaitUnchecked(barrier); 807 assertThat(future.isDone()).isTrue(); 808 // inspect state and ensure it is correct! 809 // asserts that all get calling threads received the same value 810 Object result = Iterables.getOnlyElement(finalResults); 811 if (result == CancellationException.class) { 812 assertTrue(future.isCancelled()); 813 assertTrue(cancellationSuccess.get()); 814 assertFalse(setFutureSuccess.get()); 815 } else { 816 assertTrue(setFutureSuccess.get()); 817 assertFalse(cancellationSuccess.get()); 818 } 819 // reset for next iteration 820 setFutureSuccess.set(false); 821 cancellationSuccess.set(false); 822 finalResults.clear(); 823 } 824 executor.shutdown(); 825 } 826 827 // In a previous implementation this would cause a stack overflow after ~2000 futures chained 828 // together. Now it should only be limited by available memory (and time) testSetFuture_stackOverflow()829 public void testSetFuture_stackOverflow() { 830 SettableFuture<String> orig = SettableFuture.create(); 831 SettableFuture<String> prev = orig; 832 for (int i = 0; i < 100000; i++) { 833 SettableFuture<String> curr = SettableFuture.create(); 834 prev.setFuture(curr); 835 prev = curr; 836 } 837 // prev represents the 'innermost' future 838 prev.set("done"); 839 assertTrue(orig.isDone()); 840 } 841 842 // Verify that StackOverflowError in a long chain of SetFuture doesn't cause the entire toString 843 // call to fail 844 @GwtIncompatible 845 @AndroidIncompatible testSetFutureToString_stackOverflow()846 public void testSetFutureToString_stackOverflow() { 847 SettableFuture<String> orig = SettableFuture.create(); 848 SettableFuture<String> prev = orig; 849 for (int i = 0; i < 100000; i++) { 850 SettableFuture<String> curr = SettableFuture.create(); 851 prev.setFuture(curr); 852 prev = curr; 853 } 854 // orig represents the 'outermost' future 855 assertThat(orig.toString()) 856 .contains("Exception thrown from implementation: class java.lang.StackOverflowError"); 857 } 858 testSetFuture_misbehavingFutureThrows()859 public void testSetFuture_misbehavingFutureThrows() throws Exception { 860 SettableFuture<String> future = SettableFuture.create(); 861 ListenableFuture<String> badFuture = 862 new ListenableFuture<String>() { 863 @Override 864 public boolean cancel(boolean interrupt) { 865 return false; 866 } 867 868 @Override 869 public boolean isDone() { 870 return true; 871 } 872 873 @Override 874 public boolean isCancelled() { 875 return false; // BAD!! 876 } 877 878 @Override 879 public String get() { 880 throw new CancellationException(); // BAD!! 881 } 882 883 @Override 884 public String get(long time, TimeUnit unit) { 885 throw new CancellationException(); // BAD!! 886 } 887 888 @Override 889 public void addListener(Runnable runnable, Executor executor) { 890 executor.execute(runnable); 891 } 892 }; 893 future.setFuture(badFuture); 894 ExecutionException expected = getExpectingExecutionException(future); 895 assertThat(expected).hasCauseThat().isInstanceOf(IllegalArgumentException.class); 896 assertThat(expected).hasCauseThat().hasMessageThat().contains(badFuture.toString()); 897 } 898 testSetFuture_misbehavingFutureDoesNotThrow()899 public void testSetFuture_misbehavingFutureDoesNotThrow() throws Exception { 900 SettableFuture<String> future = SettableFuture.create(); 901 ListenableFuture<String> badFuture = 902 new ListenableFuture<String>() { 903 @Override 904 public boolean cancel(boolean interrupt) { 905 return false; 906 } 907 908 @Override 909 public boolean isDone() { 910 return true; 911 } 912 913 @Override 914 public boolean isCancelled() { 915 return true; // BAD!! 916 } 917 918 @Override 919 public String get() { 920 return "foo"; // BAD!! 921 } 922 923 @Override 924 public String get(long time, TimeUnit unit) { 925 return "foo"; // BAD!! 926 } 927 928 @Override 929 public void addListener(Runnable runnable, Executor executor) { 930 executor.execute(runnable); 931 } 932 }; 933 future.setFuture(badFuture); 934 assertThat(future.isCancelled()).isTrue(); 935 } 936 testCancel_stackOverflow()937 public void testCancel_stackOverflow() { 938 SettableFuture<String> orig = SettableFuture.create(); 939 SettableFuture<String> prev = orig; 940 for (int i = 0; i < 100000; i++) { 941 SettableFuture<String> curr = SettableFuture.create(); 942 prev.setFuture(curr); 943 prev = curr; 944 } 945 // orig is the 'outermost future', this should propagate fully down the stack of futures. 946 orig.cancel(true); 947 assertTrue(orig.isCancelled()); 948 assertTrue(prev.isCancelled()); 949 assertTrue(prev.wasInterrupted()); 950 } 951 testSetFutureSelf_cancel()952 public void testSetFutureSelf_cancel() { 953 SettableFuture<String> orig = SettableFuture.create(); 954 orig.setFuture(orig); 955 orig.cancel(true); 956 assertTrue(orig.isCancelled()); 957 } 958 testSetFutureSelf_toString()959 public void testSetFutureSelf_toString() { 960 SettableFuture<String> orig = SettableFuture.create(); 961 orig.setFuture(orig); 962 assertThat(orig.toString()).contains("[status=PENDING, setFuture=[this future]]"); 963 } 964 testSetSelf_toString()965 public void testSetSelf_toString() { 966 SettableFuture<Object> orig = SettableFuture.create(); 967 orig.set(orig); 968 assertThat(orig.toString()).contains("[status=SUCCESS, result=[this future]]"); 969 } 970 testSetFutureSelf_toStringException()971 public void testSetFutureSelf_toStringException() { 972 SettableFuture<String> orig = SettableFuture.create(); 973 orig.setFuture( 974 new AbstractFuture<String>() { 975 @Override 976 public String toString() { 977 throw new NullPointerException(); 978 } 979 }); 980 assertThat(orig.toString()) 981 .contains( 982 "[status=PENDING, setFuture=[Exception thrown from implementation: class" 983 + " java.lang.NullPointerException]]"); 984 } 985 testSetIndirectSelf_toString()986 public void testSetIndirectSelf_toString() { 987 final SettableFuture<Object> orig = SettableFuture.create(); 988 // unlike the above this indirection defeats the trivial cycle detection and causes a SOE 989 orig.setFuture( 990 new ForwardingListenableFuture<Object>() { 991 @Override 992 protected ListenableFuture<Object> delegate() { 993 return orig; 994 } 995 }); 996 assertThat(orig.toString()) 997 .contains("Exception thrown from implementation: class java.lang.StackOverflowError"); 998 } 999 1000 // Regression test for a case where we would fail to execute listeners immediately on done futures 1001 // this would be observable from an afterDone callback testListenersExecuteImmediately_fromAfterDone()1002 public void testListenersExecuteImmediately_fromAfterDone() { 1003 AbstractFuture<String> f = 1004 new AbstractFuture<String>() { 1005 @Override 1006 protected void afterDone() { 1007 final AtomicBoolean ranImmediately = new AtomicBoolean(); 1008 addListener( 1009 new Runnable() { 1010 @Override 1011 public void run() { 1012 ranImmediately.set(true); 1013 } 1014 }, 1015 MoreExecutors.directExecutor()); 1016 assertThat(ranImmediately.get()).isTrue(); 1017 } 1018 }; 1019 f.set("foo"); 1020 } 1021 1022 // Regression test for a case where we would fail to execute listeners immediately on done futures 1023 // this would be observable from a waiter that was just unblocked. testListenersExecuteImmediately_afterWaiterWakesUp()1024 public void testListenersExecuteImmediately_afterWaiterWakesUp() throws Exception { 1025 final AbstractFuture<String> f = 1026 new AbstractFuture<String>() { 1027 @Override 1028 protected void afterDone() { 1029 // this simply delays executing listeners 1030 try { 1031 Thread.sleep(TimeUnit.SECONDS.toMillis(10)); 1032 } catch (InterruptedException ignored) { 1033 Thread.currentThread().interrupt(); // preserve status 1034 } 1035 } 1036 }; 1037 Thread t = 1038 new Thread() { 1039 @Override 1040 public void run() { 1041 f.set("foo"); 1042 } 1043 }; 1044 t.start(); 1045 f.get(); 1046 final AtomicBoolean ranImmediately = new AtomicBoolean(); 1047 f.addListener( 1048 new Runnable() { 1049 @Override 1050 public void run() { 1051 ranImmediately.set(true); 1052 } 1053 }, 1054 MoreExecutors.directExecutor()); 1055 assertThat(ranImmediately.get()).isTrue(); 1056 t.interrupt(); 1057 t.join(); 1058 } 1059 testCatchesUndeclaredThrowableFromListener()1060 public void testCatchesUndeclaredThrowableFromListener() { 1061 AbstractFuture<String> f = new AbstractFuture<String>() {}; 1062 f.set("foo"); 1063 f.addListener(() -> sneakyThrow(new SomeCheckedException()), directExecutor()); 1064 } 1065 1066 private static final class SomeCheckedException extends Exception {} 1067 1068 /** Throws an undeclared checked exception. */ sneakyThrow(Throwable t)1069 private static void sneakyThrow(Throwable t) { 1070 class SneakyThrower<T extends Throwable> { 1071 @SuppressWarnings("unchecked") // intentionally unsafe for test 1072 void throwIt(Throwable t) throws T { 1073 throw (T) t; 1074 } 1075 } 1076 new SneakyThrower<Error>().throwIt(t); 1077 } 1078 testTrustedGetFailure_Completed()1079 public void testTrustedGetFailure_Completed() { 1080 SettableFuture<String> future = SettableFuture.create(); 1081 future.set("261"); 1082 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1083 } 1084 testTrustedGetFailure_Failed()1085 public void testTrustedGetFailure_Failed() { 1086 SettableFuture<String> future = SettableFuture.create(); 1087 Throwable failure = new Throwable(); 1088 future.setException(failure); 1089 assertThat(future.tryInternalFastPathGetFailure()).isEqualTo(failure); 1090 } 1091 testTrustedGetFailure_NotCompleted()1092 public void testTrustedGetFailure_NotCompleted() { 1093 SettableFuture<String> future = SettableFuture.create(); 1094 assertThat(future.isDone()).isFalse(); 1095 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1096 } 1097 testTrustedGetFailure_CanceledNoCause()1098 public void testTrustedGetFailure_CanceledNoCause() { 1099 SettableFuture<String> future = SettableFuture.create(); 1100 future.cancel(false); 1101 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1102 } 1103 testGetFailure_Completed()1104 public void testGetFailure_Completed() { 1105 AbstractFuture<String> future = new AbstractFuture<String>() {}; 1106 future.set("261"); 1107 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1108 } 1109 testGetFailure_Failed()1110 public void testGetFailure_Failed() { 1111 AbstractFuture<String> future = new AbstractFuture<String>() {}; 1112 final Throwable failure = new Throwable(); 1113 future.setException(failure); 1114 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1115 } 1116 testGetFailure_NotCompleted()1117 public void testGetFailure_NotCompleted() { 1118 AbstractFuture<String> future = new AbstractFuture<String>() {}; 1119 assertThat(future.isDone()).isFalse(); 1120 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1121 } 1122 testGetFailure_CanceledNoCause()1123 public void testGetFailure_CanceledNoCause() { 1124 AbstractFuture<String> future = new AbstractFuture<String>() {}; 1125 future.cancel(false); 1126 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1127 } 1128 testForwardExceptionFastPath()1129 public void testForwardExceptionFastPath() throws Exception { 1130 class FailFuture extends InternalFutureFailureAccess implements ListenableFuture<String> { 1131 Throwable failure; 1132 1133 FailFuture(Throwable throwable) { 1134 failure = throwable; 1135 } 1136 1137 @Override 1138 public boolean cancel(boolean mayInterruptIfRunning) { 1139 throw new AssertionFailedError("cancel shouldn't be called on this object"); 1140 } 1141 1142 @Override 1143 public boolean isCancelled() { 1144 return false; 1145 } 1146 1147 @Override 1148 public boolean isDone() { 1149 return true; 1150 } 1151 1152 @Override 1153 public String get() throws InterruptedException, ExecutionException { 1154 throw new AssertionFailedError("get() shouldn't be called on this object"); 1155 } 1156 1157 @Override 1158 public String get(long timeout, TimeUnit unit) 1159 throws InterruptedException, ExecutionException, TimeoutException { 1160 return get(); 1161 } 1162 1163 @Override 1164 protected Throwable tryInternalFastPathGetFailure() { 1165 return failure; 1166 } 1167 1168 @Override 1169 public void addListener(Runnable listener, Executor executor) { 1170 throw new AssertionFailedError("addListener() shouldn't be called on this object"); 1171 } 1172 } 1173 1174 final RuntimeException exception = new RuntimeException("you still didn't say the magic word!"); 1175 SettableFuture<String> normalFuture = SettableFuture.create(); 1176 normalFuture.setFuture(new FailFuture(exception)); 1177 assertTrue(normalFuture.isDone()); 1178 ExecutionException e = assertThrows(ExecutionException.class, () -> normalFuture.get()); 1179 assertSame(exception, e.getCause()); 1180 } 1181 awaitUnchecked(final CyclicBarrier barrier)1182 private static void awaitUnchecked(final CyclicBarrier barrier) { 1183 try { 1184 barrier.await(); 1185 } catch (Exception e) { 1186 throw new RuntimeException(e); 1187 } 1188 } 1189 checkStackTrace(ExecutionException e)1190 private void checkStackTrace(ExecutionException e) { 1191 // Our call site for get() should be in the trace. 1192 int index = findStackFrame(e, getClass().getName(), "getExpectingExecutionException"); 1193 1194 assertThat(index).isNotEqualTo(0); 1195 1196 // Above our method should be the call to get(). Don't assert on the class 1197 // because it could be some superclass. 1198 assertThat(e.getStackTrace()[index - 1].getMethodName()).isEqualTo("get"); 1199 } 1200 findStackFrame(ExecutionException e, String clazz, String method)1201 private static int findStackFrame(ExecutionException e, String clazz, String method) { 1202 StackTraceElement[] elements = e.getStackTrace(); 1203 for (int i = 0; i < elements.length; i++) { 1204 StackTraceElement element = elements[i]; 1205 if (element.getClassName().equals(clazz) && element.getMethodName().equals(method)) { 1206 return i; 1207 } 1208 } 1209 AssertionFailedError failure = 1210 new AssertionFailedError( 1211 "Expected element " + clazz + "." + method + " not found in stack trace"); 1212 failure.initCause(e); 1213 throw failure; 1214 } 1215 getExpectingExecutionException(AbstractFuture<String> future)1216 private ExecutionException getExpectingExecutionException(AbstractFuture<String> future) 1217 throws InterruptedException { 1218 try { 1219 String got = future.get(); 1220 throw new AssertionError("Expected exception but got " + got); 1221 } catch (ExecutionException e) { 1222 return e; 1223 } 1224 } 1225 1226 private static final class WaiterThread extends Thread { 1227 private final AbstractFuture<?> future; 1228 WaiterThread(AbstractFuture<?> future)1229 private WaiterThread(AbstractFuture<?> future) { 1230 this.future = future; 1231 } 1232 1233 @Override run()1234 public void run() { 1235 try { 1236 future.get(); 1237 } catch (Exception e) { 1238 // nothing 1239 } 1240 } 1241 awaitWaiting()1242 void awaitWaiting() { 1243 while (!isBlocked()) { 1244 if (getState() == State.TERMINATED) { 1245 throw new RuntimeException("Thread exited"); 1246 } 1247 Thread.yield(); 1248 } 1249 } 1250 isBlocked()1251 private boolean isBlocked() { 1252 return getState() == Thread.State.WAITING && LockSupport.getBlocker(this) == future; 1253 } 1254 } 1255 1256 static final class TimedWaiterThread extends Thread { 1257 private final AbstractFuture<?> future; 1258 private final long timeout; 1259 private final TimeUnit unit; 1260 private Exception exception; 1261 private volatile long startTime; 1262 private long timeSpentBlocked; 1263 TimedWaiterThread(AbstractFuture<?> future, long timeout, TimeUnit unit)1264 TimedWaiterThread(AbstractFuture<?> future, long timeout, TimeUnit unit) { 1265 this.future = future; 1266 this.timeout = timeout; 1267 this.unit = unit; 1268 } 1269 1270 @Override run()1271 public void run() { 1272 startTime = System.nanoTime(); 1273 try { 1274 future.get(timeout, unit); 1275 } catch (Exception e) { 1276 // nothing 1277 exception = e; 1278 } finally { 1279 timeSpentBlocked = System.nanoTime() - startTime; 1280 } 1281 } 1282 awaitWaiting()1283 void awaitWaiting() { 1284 while (!isBlocked()) { 1285 if (getState() == State.TERMINATED) { 1286 throw new RuntimeException("Thread exited"); 1287 } 1288 Thread.yield(); 1289 } 1290 } 1291 isBlocked()1292 private boolean isBlocked() { 1293 return getState() == Thread.State.TIMED_WAITING && LockSupport.getBlocker(this) == future; 1294 } 1295 } 1296 1297 private static final class PollingThread extends Thread { 1298 private final AbstractFuture<?> future; 1299 private final CountDownLatch completedIteration = new CountDownLatch(10); 1300 PollingThread(AbstractFuture<?> future)1301 private PollingThread(AbstractFuture<?> future) { 1302 this.future = future; 1303 } 1304 1305 @Override run()1306 public void run() { 1307 while (true) { 1308 try { 1309 future.get(0, TimeUnit.SECONDS); 1310 return; 1311 } catch (InterruptedException | ExecutionException e) { 1312 return; 1313 } catch (TimeoutException e) { 1314 // do nothing 1315 } finally { 1316 completedIteration.countDown(); 1317 } 1318 } 1319 } 1320 awaitInLoop()1321 void awaitInLoop() { 1322 Uninterruptibles.awaitUninterruptibly(completedIteration); 1323 } 1324 } 1325 1326 private static final class InterruptibleFuture extends AbstractFuture<String> { 1327 boolean interruptTaskWasCalled; 1328 1329 @Override interruptTask()1330 protected void interruptTask() { 1331 assertFalse(interruptTaskWasCalled); 1332 interruptTaskWasCalled = true; 1333 } 1334 } 1335 isWindows()1336 private static boolean isWindows() { 1337 return OS_NAME.value().startsWith("Windows"); 1338 } 1339 } 1340