1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.base.StandardSystemProperty.JAVA_SPECIFICATION_VERSION; 20 import static com.google.common.base.StandardSystemProperty.OS_NAME; 21 import static com.google.common.truth.Truth.assertThat; 22 import static com.google.common.truth.Truth.assertWithMessage; 23 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 24 import static org.junit.Assert.assertThrows; 25 26 import com.google.common.annotations.GwtIncompatible; 27 import com.google.common.annotations.J2ktIncompatible; 28 import com.google.common.collect.Iterables; 29 import com.google.common.collect.Range; 30 import com.google.common.collect.Sets; 31 import com.google.common.primitives.Ints; 32 import com.google.common.util.concurrent.internal.InternalFutureFailureAccess; 33 import java.util.ArrayList; 34 import java.util.Arrays; 35 import java.util.Collections; 36 import java.util.List; 37 import java.util.Set; 38 import java.util.concurrent.Callable; 39 import java.util.concurrent.CancellationException; 40 import java.util.concurrent.CountDownLatch; 41 import java.util.concurrent.CyclicBarrier; 42 import java.util.concurrent.ExecutionException; 43 import java.util.concurrent.Executor; 44 import java.util.concurrent.ExecutorService; 45 import java.util.concurrent.Executors; 46 import java.util.concurrent.Future; 47 import java.util.concurrent.TimeUnit; 48 import java.util.concurrent.TimeoutException; 49 import java.util.concurrent.atomic.AtomicBoolean; 50 import java.util.concurrent.atomic.AtomicInteger; 51 import java.util.concurrent.atomic.AtomicReference; 52 import java.util.concurrent.locks.LockSupport; 53 import junit.framework.AssertionFailedError; 54 import junit.framework.TestCase; 55 import org.checkerframework.checker.nullness.qual.Nullable; 56 57 /** 58 * Tests for {@link AbstractFuture}. 59 * 60 * @author Brian Stoler 61 */ 62 public class AbstractFutureTest extends TestCase { testSuccess()63 public void testSuccess() throws ExecutionException, InterruptedException { 64 final Object value = new Object(); 65 assertSame( 66 value, 67 new AbstractFuture<Object>() { 68 { 69 set(value); 70 } 71 }.get()); 72 } 73 testException()74 public void testException() throws InterruptedException { 75 final Throwable failure = new Throwable(); 76 AbstractFuture<String> future = 77 new AbstractFuture<String>() { 78 { 79 setException(failure); 80 } 81 }; 82 83 ExecutionException ee1 = getExpectingExecutionException(future); 84 ExecutionException ee2 = getExpectingExecutionException(future); 85 86 // Ensure we get a unique execution exception on each get 87 assertNotSame(ee1, ee2); 88 89 assertThat(ee1).hasCauseThat().isSameInstanceAs(failure); 90 assertThat(ee2).hasCauseThat().isSameInstanceAs(failure); 91 92 checkStackTrace(ee1); 93 checkStackTrace(ee2); 94 } 95 testCancel_notDoneNoInterrupt()96 public void testCancel_notDoneNoInterrupt() throws Exception { 97 InterruptibleFuture future = new InterruptibleFuture(); 98 assertTrue(future.cancel(false)); 99 assertTrue(future.isCancelled()); 100 assertTrue(future.isDone()); 101 assertFalse(future.wasInterrupted()); 102 assertFalse(future.interruptTaskWasCalled); 103 CancellationException e = assertThrows(CancellationException.class, () -> future.get()); 104 assertThat(e).hasCauseThat().isNull(); 105 } 106 testCancel_notDoneInterrupt()107 public void testCancel_notDoneInterrupt() throws Exception { 108 InterruptibleFuture future = new InterruptibleFuture(); 109 assertTrue(future.cancel(true)); 110 assertTrue(future.isCancelled()); 111 assertTrue(future.isDone()); 112 assertTrue(future.wasInterrupted()); 113 assertTrue(future.interruptTaskWasCalled); 114 CancellationException e = assertThrows(CancellationException.class, () -> future.get()); 115 assertThat(e).hasCauseThat().isNull(); 116 } 117 testCancel_done()118 public void testCancel_done() throws Exception { 119 AbstractFuture<String> future = 120 new AbstractFuture<String>() { 121 { 122 set("foo"); 123 } 124 }; 125 assertFalse(future.cancel(true)); 126 assertFalse(future.isCancelled()); 127 assertTrue(future.isDone()); 128 } 129 testGetWithTimeoutDoneFuture()130 public void testGetWithTimeoutDoneFuture() throws Exception { 131 AbstractFuture<String> future = 132 new AbstractFuture<String>() { 133 { 134 set("foo"); 135 } 136 }; 137 assertEquals("foo", future.get(0, TimeUnit.SECONDS)); 138 } 139 testEvilFuture_setFuture()140 public void testEvilFuture_setFuture() throws Exception { 141 final RuntimeException exception = new RuntimeException("you didn't say the magic word!"); 142 AbstractFuture<String> evilFuture = 143 new AbstractFuture<String>() { 144 @Override 145 public void addListener(Runnable r, Executor e) { 146 throw exception; 147 } 148 }; 149 AbstractFuture<String> normalFuture = new AbstractFuture<String>() {}; 150 normalFuture.setFuture(evilFuture); 151 assertTrue(normalFuture.isDone()); 152 ExecutionException e = assertThrows(ExecutionException.class, () -> normalFuture.get()); 153 assertThat(e).hasCauseThat().isSameInstanceAs(exception); 154 } 155 testRemoveWaiter_interruption()156 public void testRemoveWaiter_interruption() throws Exception { 157 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 158 WaiterThread waiter1 = new WaiterThread(future); 159 waiter1.start(); 160 waiter1.awaitWaiting(); 161 162 WaiterThread waiter2 = new WaiterThread(future); 163 waiter2.start(); 164 waiter2.awaitWaiting(); 165 // The waiter queue should be waiter2->waiter1 166 167 // This should wake up waiter1 and cause the waiter1 node to be removed. 168 waiter1.interrupt(); 169 170 waiter1.join(); 171 waiter2.awaitWaiting(); // should still be blocked 172 173 LockSupport.unpark(waiter2); // spurious wakeup 174 waiter2.awaitWaiting(); // should eventually re-park 175 176 future.set(null); 177 waiter2.join(); 178 } 179 testRemoveWaiter_polling()180 public void testRemoveWaiter_polling() throws Exception { 181 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 182 WaiterThread waiter = new WaiterThread(future); 183 waiter.start(); 184 waiter.awaitWaiting(); 185 PollingThread poller = new PollingThread(future); 186 poller.start(); 187 PollingThread poller2 = new PollingThread(future); 188 poller2.start(); 189 PollingThread poller3 = new PollingThread(future); 190 poller3.start(); 191 poller.awaitInLoop(); 192 poller2.awaitInLoop(); 193 poller3.awaitInLoop(); 194 195 // The waiter queue should be {poller x 3}->waiter1 196 waiter.interrupt(); 197 198 // This should wake up waiter1 and cause the waiter1 node to be removed. 199 waiter.join(); 200 future.set(null); 201 poller.join(); 202 } 203 testToString_allUnique()204 public void testToString_allUnique() throws Exception { 205 // Two futures should not have the same toString, to avoid people asserting on it 206 assertThat(SettableFuture.create().toString()).isNotEqualTo(SettableFuture.create().toString()); 207 } 208 testToString_oom()209 public void testToString_oom() throws Exception { 210 SettableFuture<Object> future = SettableFuture.create(); 211 future.set( 212 new Object() { 213 @Override 214 public String toString() { 215 throw new OutOfMemoryError(); 216 } 217 218 @Override 219 public int hashCode() { 220 throw new OutOfMemoryError(); 221 } 222 }); 223 224 String unused = future.toString(); 225 226 SettableFuture<Object> future2 = SettableFuture.create(); 227 228 // A more organic OOM from a toString implementation 229 Object object = 230 new Object() { 231 @Override 232 public String toString() { 233 return new String(new char[50_000]); 234 } 235 }; 236 List<Object> list = Collections.singletonList(object); 237 for (int i = 0; i < 10; i++) { 238 Object[] array = new Object[500]; 239 Arrays.fill(array, list); 240 list = Arrays.asList(array); 241 } 242 future2.set(list); 243 244 unused = future.toString(); 245 } 246 testToString_notDone()247 public void testToString_notDone() throws Exception { 248 AbstractFuture<Object> testFuture = 249 new AbstractFuture<Object>() { 250 @Override 251 public String pendingToString() { 252 return "cause=[Because this test isn't done]"; 253 } 254 }; 255 assertThat(testFuture.toString()) 256 .matches( 257 "[^\\[]+\\[status=PENDING, info=\\[cause=\\[Because this test isn't done\\]\\]\\]"); 258 TimeoutException e = 259 assertThrows(TimeoutException.class, () -> testFuture.get(1, TimeUnit.NANOSECONDS)); 260 assertThat(e.getMessage()).contains("1 nanoseconds"); 261 assertThat(e.getMessage()).contains("Because this test isn't done"); 262 } 263 testToString_completesDuringToString()264 public void testToString_completesDuringToString() throws Exception { 265 AbstractFuture<Object> testFuture = 266 new AbstractFuture<Object>() { 267 @Override 268 public String pendingToString() { 269 // Complete ourselves during the toString calculation 270 this.set(true); 271 return "cause=[Because this test isn't done]"; 272 } 273 }; 274 assertThat(testFuture.toString()) 275 .matches("[^\\[]+\\[status=SUCCESS, result=\\[java.lang.Boolean@\\w+\\]\\]"); 276 } 277 278 /** 279 * This test attempts to cause a future to wait for longer than it was requested to from a timed 280 * get() call. As measurements of time are prone to flakiness, it tries to assert based on ranges 281 * derived from observing how much time actually passed for various operations. 282 */ 283 @SuppressWarnings("ThreadPriorityCheck") 284 @AndroidIncompatible // Thread.suspend testToString_delayedTimeout()285 public void testToString_delayedTimeout() throws Exception { 286 Integer javaVersion = Ints.tryParse(JAVA_SPECIFICATION_VERSION.value()); 287 // Parsing to an integer might fail because Java 8 returns "1.8" instead of "8." 288 // We can continue if it's 1.8, and we can continue if it's an integer in [9, 20). 289 if (javaVersion != null && javaVersion >= 20) { 290 // TODO(b/261217224, b/361604053): Make this test work under newer JDKs. 291 return; 292 } 293 TimedWaiterThread thread = 294 new TimedWaiterThread(new AbstractFuture<Object>() {}, 2, TimeUnit.SECONDS); 295 thread.start(); 296 thread.awaitWaiting(); 297 Thread.class.getMethod("suspend").invoke(thread); 298 // Sleep for enough time to add 1500 milliseconds of overwait to the get() call. 299 long toWaitMillis = 3500 - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - thread.startTime); 300 Thread.sleep(toWaitMillis); 301 thread.setPriority(Thread.MAX_PRIORITY); 302 Thread.class.getMethod("resume").invoke(thread); 303 thread.join(); 304 // It's possible to race and suspend the thread just before the park call actually takes effect, 305 // causing the thread to be suspended for 3.5 seconds, and then park itself for 2 seconds after 306 // being resumed. To avoid a flake in this scenario, calculate how long that thread actually 307 // waited and assert based on that time. Empirically, the race where the thread ends up waiting 308 // for 5.5 seconds happens about 2% of the time. 309 boolean longWait = TimeUnit.NANOSECONDS.toSeconds(thread.timeSpentBlocked) >= 5; 310 // Count how long it actually took to return; we'll accept any number between the expected delay 311 // and the approximate actual delay, to be robust to variance in thread scheduling. 312 char overWaitNanosFirstDigit = 313 Long.toString( 314 thread.timeSpentBlocked - TimeUnit.MILLISECONDS.toNanos(longWait ? 5000 : 3000)) 315 .charAt(0); 316 if (overWaitNanosFirstDigit < '4') { 317 overWaitNanosFirstDigit = '9'; 318 } 319 String nanosRegex = "[4-" + overWaitNanosFirstDigit + "][0-9]+"; 320 assertWithMessage( 321 "Spent " + thread.timeSpentBlocked + " ns blocked; slept for " + toWaitMillis + " ms") 322 .that(thread.exception) 323 .hasMessageThat() 324 .matches( 325 "Waited 2 seconds \\(plus " 326 + (longWait ? "3" : "1") 327 + " seconds, " 328 + nanosRegex 329 + " nanoseconds delay\\).*"); 330 } 331 testToString_completed()332 public void testToString_completed() throws Exception { 333 AbstractFuture<Object> testFuture2 = 334 new AbstractFuture<Object>() { 335 @Override 336 public String pendingToString() { 337 return "cause=[Someday...]"; 338 } 339 }; 340 AbstractFuture<Object> testFuture3 = new AbstractFuture<Object>() {}; 341 testFuture3.setFuture(testFuture2); 342 assertThat(testFuture3.toString()) 343 .matches( 344 "[^\\[]+\\[status=PENDING, setFuture=\\[[^\\[]+\\[status=PENDING," 345 + " info=\\[cause=\\[Someday...]]]]]"); 346 testFuture2.set("result string"); 347 assertThat(testFuture3.toString()) 348 .matches("[^\\[]+\\[status=SUCCESS, result=\\[java.lang.String@\\w+\\]\\]"); 349 } 350 testToString_cancelled()351 public void testToString_cancelled() throws Exception { 352 assertThat(Futures.immediateCancelledFuture().toString()) 353 .matches("[^\\[]+\\[status=CANCELLED\\]"); 354 } 355 testToString_failed()356 public void testToString_failed() { 357 assertThat(Futures.immediateFailedFuture(new RuntimeException("foo")).toString()) 358 .matches("[^\\[]+\\[status=FAILURE, cause=\\[java.lang.RuntimeException: foo\\]\\]"); 359 } 360 testToString_misbehaving()361 public void testToString_misbehaving() throws Exception { 362 assertThat( 363 new AbstractFuture<Object>() { 364 @Override 365 public String pendingToString() { 366 throw new RuntimeException("I'm a misbehaving implementation"); 367 } 368 }.toString()) 369 .matches( 370 "[^\\[]+\\[status=PENDING, info=\\[Exception thrown from implementation: " 371 + "class java.lang.RuntimeException\\]\\]"); 372 } 373 testCompletionFinishesWithDone()374 public void testCompletionFinishesWithDone() { 375 ExecutorService executor = Executors.newFixedThreadPool(10); 376 for (int i = 0; i < 50000; i++) { 377 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 378 final AtomicReference<String> errorMessage = Atomics.newReference(); 379 executor.execute( 380 new Runnable() { 381 @Override 382 public void run() { 383 future.set("success"); 384 if (!future.isDone()) { 385 errorMessage.set("Set call exited before future was complete."); 386 } 387 } 388 }); 389 executor.execute( 390 new Runnable() { 391 @Override 392 public void run() { 393 future.setException(new IllegalArgumentException("failure")); 394 if (!future.isDone()) { 395 errorMessage.set("SetException call exited before future was complete."); 396 } 397 } 398 }); 399 executor.execute( 400 new Runnable() { 401 @Override 402 public void run() { 403 future.cancel(true); 404 if (!future.isDone()) { 405 errorMessage.set("Cancel call exited before future was complete."); 406 } 407 } 408 }); 409 try { 410 future.get(); 411 } catch (Throwable t) { 412 // Ignore, we just wanted to block. 413 } 414 String error = errorMessage.get(); 415 assertNull(error, error); 416 } 417 executor.shutdown(); 418 } 419 420 /** 421 * He did the bash, he did the future bash The future bash, it was a concurrency smash He did the 422 * bash, it caught on in a flash He did the bash, he did the future bash 423 */ 424 testFutureBash()425 public void testFutureBash() { 426 if (isWindows()) { 427 return; // TODO: b/136041958 - Running very slowly on Windows CI. 428 } 429 final CyclicBarrier barrier = 430 new CyclicBarrier( 431 6 // for the setter threads 432 + 50 // for the listeners 433 + 50 // for the blocking get threads, 434 + 1); // for the main thread 435 final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties()); 436 final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference(); 437 final AtomicInteger numSuccessfulSetCalls = new AtomicInteger(); 438 Callable<@Nullable Void> completeSuccessfullyRunnable = 439 new Callable<@Nullable Void>() { 440 @Override 441 public @Nullable Void call() { 442 if (currentFuture.get().set("set")) { 443 numSuccessfulSetCalls.incrementAndGet(); 444 } 445 awaitUnchecked(barrier); 446 return null; 447 } 448 }; 449 Callable<@Nullable Void> completeExceptionallyRunnable = 450 new Callable<@Nullable Void>() { 451 Exception failureCause = new Exception("setException"); 452 453 @Override 454 public @Nullable Void call() { 455 if (currentFuture.get().setException(failureCause)) { 456 numSuccessfulSetCalls.incrementAndGet(); 457 } 458 awaitUnchecked(barrier); 459 return null; 460 } 461 }; 462 Callable<@Nullable Void> cancelRunnable = 463 new Callable<@Nullable Void>() { 464 @Override 465 public @Nullable Void call() { 466 if (currentFuture.get().cancel(true)) { 467 numSuccessfulSetCalls.incrementAndGet(); 468 } 469 awaitUnchecked(barrier); 470 return null; 471 } 472 }; 473 Callable<@Nullable Void> setFutureCompleteSuccessfullyRunnable = 474 new Callable<@Nullable Void>() { 475 ListenableFuture<String> future = Futures.immediateFuture("setFuture"); 476 477 @Override 478 public @Nullable Void call() { 479 if (currentFuture.get().setFuture(future)) { 480 numSuccessfulSetCalls.incrementAndGet(); 481 } 482 awaitUnchecked(barrier); 483 return null; 484 } 485 }; 486 Callable<@Nullable Void> setFutureCompleteExceptionallyRunnable = 487 new Callable<@Nullable Void>() { 488 ListenableFuture<String> future = 489 Futures.immediateFailedFuture(new Exception("setFuture")); 490 491 @Override 492 public @Nullable Void call() { 493 if (currentFuture.get().setFuture(future)) { 494 numSuccessfulSetCalls.incrementAndGet(); 495 } 496 awaitUnchecked(barrier); 497 return null; 498 } 499 }; 500 Callable<@Nullable Void> setFutureCancelRunnable = 501 new Callable<@Nullable Void>() { 502 ListenableFuture<String> future = Futures.immediateCancelledFuture(); 503 504 @Override 505 public @Nullable Void call() { 506 if (currentFuture.get().setFuture(future)) { 507 numSuccessfulSetCalls.incrementAndGet(); 508 } 509 awaitUnchecked(barrier); 510 return null; 511 } 512 }; 513 final Set<Object> finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet()); 514 Runnable collectResultsRunnable = 515 new Runnable() { 516 @Override 517 public void run() { 518 try { 519 String result = Uninterruptibles.getUninterruptibly(currentFuture.get()); 520 finalResults.add(result); 521 } catch (ExecutionException e) { 522 finalResults.add(e.getCause()); 523 } catch (CancellationException e) { 524 finalResults.add(CancellationException.class); 525 } finally { 526 awaitUnchecked(barrier); 527 } 528 } 529 }; 530 Runnable collectResultsTimedGetRunnable = 531 new Runnable() { 532 @Override 533 public void run() { 534 Future<String> future = currentFuture.get(); 535 while (true) { 536 try { 537 String result = Uninterruptibles.getUninterruptibly(future, 0, TimeUnit.SECONDS); 538 finalResults.add(result); 539 break; 540 } catch (ExecutionException e) { 541 finalResults.add(e.getCause()); 542 break; 543 } catch (CancellationException e) { 544 finalResults.add(CancellationException.class); 545 break; 546 } catch (TimeoutException e) { 547 // loop 548 } 549 } 550 awaitUnchecked(barrier); 551 } 552 }; 553 List<Callable<?>> allTasks = new ArrayList<>(); 554 allTasks.add(completeSuccessfullyRunnable); 555 allTasks.add(completeExceptionallyRunnable); 556 allTasks.add(cancelRunnable); 557 allTasks.add(setFutureCompleteSuccessfullyRunnable); 558 allTasks.add(setFutureCompleteExceptionallyRunnable); 559 allTasks.add(setFutureCancelRunnable); 560 for (int k = 0; k < 50; k++) { 561 // For each listener we add a task that submits it to the executor directly for the blocking 562 // get use case and another task that adds it as a listener to the future to exercise both 563 // racing addListener calls and addListener calls completing after the future completes. 564 final Runnable listener = 565 k % 2 == 0 ? collectResultsRunnable : collectResultsTimedGetRunnable; 566 allTasks.add(Executors.callable(listener)); 567 allTasks.add( 568 new Callable<@Nullable Void>() { 569 @Override 570 public @Nullable Void call() throws Exception { 571 currentFuture.get().addListener(listener, executor); 572 return null; 573 } 574 }); 575 } 576 assertEquals(allTasks.size() + 1, barrier.getParties()); 577 for (int i = 0; i < 1000; i++) { 578 Collections.shuffle(allTasks); 579 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 580 currentFuture.set(future); 581 for (Callable<?> task : allTasks) { 582 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 583 Future<?> possiblyIgnoredError = executor.submit(task); 584 } 585 awaitUnchecked(barrier); 586 assertThat(future.isDone()).isTrue(); 587 // inspect state and ensure it is correct! 588 // asserts that all get calling threads received the same value 589 Object result = Iterables.getOnlyElement(finalResults); 590 if (result == CancellationException.class) { 591 assertTrue(future.isCancelled()); 592 if (future.wasInterrupted()) { 593 // We were cancelled, it is possible that setFuture could have succeeded too. 594 assertThat(numSuccessfulSetCalls.get()).isIn(Range.closed(1, 2)); 595 } else { 596 assertThat(numSuccessfulSetCalls.get()).isEqualTo(1); 597 } 598 } else { 599 assertThat(numSuccessfulSetCalls.get()).isEqualTo(1); 600 } 601 // reset for next iteration 602 numSuccessfulSetCalls.set(0); 603 finalResults.clear(); 604 } 605 executor.shutdown(); 606 } 607 608 // setFuture and cancel() interact in more complicated ways than the other setters. testSetFutureCancelBash()609 public void testSetFutureCancelBash() { 610 if (isWindows()) { 611 return; // TODO: b/136041958 - Running very slowly on Windows CI. 612 } 613 final int size = 50; 614 final CyclicBarrier barrier = 615 new CyclicBarrier( 616 2 // for the setter threads 617 + size // for the listeners 618 + size // for the get threads, 619 + 1); // for the main thread 620 final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties()); 621 final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference(); 622 final AtomicReference<AbstractFuture<String>> setFutureFuture = Atomics.newReference(); 623 final AtomicBoolean setFutureSetSuccess = new AtomicBoolean(); 624 final AtomicBoolean setFutureCompletionSuccess = new AtomicBoolean(); 625 final AtomicBoolean cancellationSuccess = new AtomicBoolean(); 626 Runnable cancelRunnable = 627 new Runnable() { 628 @Override 629 public void run() { 630 cancellationSuccess.set(currentFuture.get().cancel(true)); 631 awaitUnchecked(barrier); 632 } 633 }; 634 Runnable setFutureCompleteSuccessfullyRunnable = 635 new Runnable() { 636 @Override 637 public void run() { 638 AbstractFuture<String> future = setFutureFuture.get(); 639 setFutureSetSuccess.set(currentFuture.get().setFuture(future)); 640 setFutureCompletionSuccess.set(future.set("hello-async-world")); 641 awaitUnchecked(barrier); 642 } 643 }; 644 final Set<Object> finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet()); 645 Runnable collectResultsRunnable = 646 new Runnable() { 647 @Override 648 public void run() { 649 try { 650 String result = Uninterruptibles.getUninterruptibly(currentFuture.get()); 651 finalResults.add(result); 652 } catch (ExecutionException e) { 653 finalResults.add(e.getCause()); 654 } catch (CancellationException e) { 655 finalResults.add(CancellationException.class); 656 } finally { 657 awaitUnchecked(barrier); 658 } 659 } 660 }; 661 Runnable collectResultsTimedGetRunnable = 662 new Runnable() { 663 @Override 664 public void run() { 665 Future<String> future = currentFuture.get(); 666 while (true) { 667 try { 668 String result = Uninterruptibles.getUninterruptibly(future, 0, TimeUnit.SECONDS); 669 finalResults.add(result); 670 break; 671 } catch (ExecutionException e) { 672 finalResults.add(e.getCause()); 673 break; 674 } catch (CancellationException e) { 675 finalResults.add(CancellationException.class); 676 break; 677 } catch (TimeoutException e) { 678 // loop 679 } 680 } 681 awaitUnchecked(barrier); 682 } 683 }; 684 List<Runnable> allTasks = new ArrayList<>(); 685 allTasks.add(cancelRunnable); 686 allTasks.add(setFutureCompleteSuccessfullyRunnable); 687 for (int k = 0; k < size; k++) { 688 // For each listener we add a task that submits it to the executor directly for the blocking 689 // get use case and another task that adds it as a listener to the future to exercise both 690 // racing addListener calls and addListener calls completing after the future completes. 691 final Runnable listener = 692 k % 2 == 0 ? collectResultsRunnable : collectResultsTimedGetRunnable; 693 allTasks.add(listener); 694 allTasks.add( 695 new Runnable() { 696 @Override 697 public void run() { 698 currentFuture.get().addListener(listener, executor); 699 } 700 }); 701 } 702 assertEquals(allTasks.size() + 1, barrier.getParties()); // sanity check 703 for (int i = 0; i < 1000; i++) { 704 Collections.shuffle(allTasks); 705 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 706 final AbstractFuture<String> setFuture = new AbstractFuture<String>() {}; 707 currentFuture.set(future); 708 setFutureFuture.set(setFuture); 709 for (Runnable task : allTasks) { 710 executor.execute(task); 711 } 712 awaitUnchecked(barrier); 713 assertThat(future.isDone()).isTrue(); 714 // inspect state and ensure it is correct! 715 // asserts that all get calling threads received the same value 716 Object result = Iterables.getOnlyElement(finalResults); 717 if (result == CancellationException.class) { 718 assertTrue(future.isCancelled()); 719 assertTrue(cancellationSuccess.get()); 720 // cancellation can interleave in 3 ways 721 // 1. prior to setFuture 722 // 2. after setFuture before set() on the future assigned 723 // 3. after setFuture and set() are called but before the listener completes. 724 if (!setFutureSetSuccess.get() || !setFutureCompletionSuccess.get()) { 725 // If setFuture fails or set on the future fails then it must be because that future was 726 // cancelled 727 assertTrue(setFuture.isCancelled()); 728 assertTrue(setFuture.wasInterrupted()); // we only call cancel(true) 729 } 730 } else { 731 // set on the future completed 732 assertFalse(cancellationSuccess.get()); 733 assertTrue(setFutureSetSuccess.get()); 734 assertTrue(setFutureCompletionSuccess.get()); 735 } 736 // reset for next iteration 737 setFutureSetSuccess.set(false); 738 setFutureCompletionSuccess.set(false); 739 cancellationSuccess.set(false); 740 finalResults.clear(); 741 } 742 executor.shutdown(); 743 } 744 745 // Test to ensure that when calling setFuture with a done future only setFuture or cancel can 746 // return true. testSetFutureCancelBash_withDoneFuture()747 public void testSetFutureCancelBash_withDoneFuture() { 748 final CyclicBarrier barrier = 749 new CyclicBarrier( 750 2 // for the setter threads 751 + 1 // for the blocking get thread, 752 + 1); // for the main thread 753 final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties()); 754 final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference(); 755 final AtomicBoolean setFutureSuccess = new AtomicBoolean(); 756 final AtomicBoolean cancellationSuccess = new AtomicBoolean(); 757 Callable<@Nullable Void> cancelRunnable = 758 new Callable<@Nullable Void>() { 759 @Override 760 public @Nullable Void call() { 761 cancellationSuccess.set(currentFuture.get().cancel(true)); 762 awaitUnchecked(barrier); 763 return null; 764 } 765 }; 766 Callable<@Nullable Void> setFutureCompleteSuccessfullyRunnable = 767 new Callable<@Nullable Void>() { 768 final ListenableFuture<String> future = Futures.immediateFuture("hello"); 769 770 @Override 771 public @Nullable Void call() { 772 setFutureSuccess.set(currentFuture.get().setFuture(future)); 773 awaitUnchecked(barrier); 774 return null; 775 } 776 }; 777 final Set<Object> finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet()); 778 final Runnable collectResultsRunnable = 779 new Runnable() { 780 @Override 781 public void run() { 782 try { 783 String result = Uninterruptibles.getUninterruptibly(currentFuture.get()); 784 finalResults.add(result); 785 } catch (ExecutionException e) { 786 finalResults.add(e.getCause()); 787 } catch (CancellationException e) { 788 finalResults.add(CancellationException.class); 789 } finally { 790 awaitUnchecked(barrier); 791 } 792 } 793 }; 794 List<Callable<?>> allTasks = new ArrayList<>(); 795 allTasks.add(cancelRunnable); 796 allTasks.add(setFutureCompleteSuccessfullyRunnable); 797 allTasks.add(Executors.callable(collectResultsRunnable)); 798 assertEquals(allTasks.size() + 1, barrier.getParties()); // sanity check 799 for (int i = 0; i < 1000; i++) { 800 Collections.shuffle(allTasks); 801 final AbstractFuture<String> future = new AbstractFuture<String>() {}; 802 currentFuture.set(future); 803 for (Callable<?> task : allTasks) { 804 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 805 Future<?> possiblyIgnoredError = executor.submit(task); 806 } 807 awaitUnchecked(barrier); 808 assertThat(future.isDone()).isTrue(); 809 // inspect state and ensure it is correct! 810 // asserts that all get calling threads received the same value 811 Object result = Iterables.getOnlyElement(finalResults); 812 if (result == CancellationException.class) { 813 assertTrue(future.isCancelled()); 814 assertTrue(cancellationSuccess.get()); 815 assertFalse(setFutureSuccess.get()); 816 } else { 817 assertTrue(setFutureSuccess.get()); 818 assertFalse(cancellationSuccess.get()); 819 } 820 // reset for next iteration 821 setFutureSuccess.set(false); 822 cancellationSuccess.set(false); 823 finalResults.clear(); 824 } 825 executor.shutdown(); 826 } 827 828 // In a previous implementation this would cause a stack overflow after ~2000 futures chained 829 // together. Now it should only be limited by available memory (and time) testSetFuture_stackOverflow()830 public void testSetFuture_stackOverflow() { 831 SettableFuture<String> orig = SettableFuture.create(); 832 SettableFuture<String> prev = orig; 833 for (int i = 0; i < 100000; i++) { 834 SettableFuture<String> curr = SettableFuture.create(); 835 prev.setFuture(curr); 836 prev = curr; 837 } 838 // prev represents the 'innermost' future 839 prev.set("done"); 840 assertTrue(orig.isDone()); 841 } 842 843 // Verify that StackOverflowError in a long chain of SetFuture doesn't cause the entire toString 844 // call to fail 845 @J2ktIncompatible 846 @GwtIncompatible 847 @AndroidIncompatible testSetFutureToString_stackOverflow()848 public void testSetFutureToString_stackOverflow() { 849 SettableFuture<String> orig = SettableFuture.create(); 850 SettableFuture<String> prev = orig; 851 for (int i = 0; i < 100000; i++) { 852 SettableFuture<String> curr = SettableFuture.create(); 853 prev.setFuture(curr); 854 prev = curr; 855 } 856 // orig represents the 'outermost' future 857 assertThat(orig.toString()) 858 .contains("Exception thrown from implementation: class java.lang.StackOverflowError"); 859 } 860 testSetFuture_misbehavingFutureThrows()861 public void testSetFuture_misbehavingFutureThrows() throws Exception { 862 SettableFuture<String> future = SettableFuture.create(); 863 ListenableFuture<String> badFuture = 864 new ListenableFuture<String>() { 865 @Override 866 public boolean cancel(boolean interrupt) { 867 return false; 868 } 869 870 @Override 871 public boolean isDone() { 872 return true; 873 } 874 875 @Override 876 public boolean isCancelled() { 877 return false; // BAD!! 878 } 879 880 @Override 881 public String get() { 882 throw new CancellationException(); // BAD!! 883 } 884 885 @Override 886 public String get(long time, TimeUnit unit) { 887 throw new CancellationException(); // BAD!! 888 } 889 890 @Override 891 public void addListener(Runnable runnable, Executor executor) { 892 executor.execute(runnable); 893 } 894 }; 895 future.setFuture(badFuture); 896 ExecutionException expected = getExpectingExecutionException(future); 897 assertThat(expected).hasCauseThat().isInstanceOf(IllegalArgumentException.class); 898 assertThat(expected).hasCauseThat().hasMessageThat().contains(badFuture.toString()); 899 } 900 testSetFuture_misbehavingFutureDoesNotThrow()901 public void testSetFuture_misbehavingFutureDoesNotThrow() throws Exception { 902 SettableFuture<String> future = SettableFuture.create(); 903 ListenableFuture<String> badFuture = 904 new ListenableFuture<String>() { 905 @Override 906 public boolean cancel(boolean interrupt) { 907 return false; 908 } 909 910 @Override 911 public boolean isDone() { 912 return true; 913 } 914 915 @Override 916 public boolean isCancelled() { 917 return true; // BAD!! 918 } 919 920 @Override 921 public String get() { 922 return "foo"; // BAD!! 923 } 924 925 @Override 926 public String get(long time, TimeUnit unit) { 927 return "foo"; // BAD!! 928 } 929 930 @Override 931 public void addListener(Runnable runnable, Executor executor) { 932 executor.execute(runnable); 933 } 934 }; 935 future.setFuture(badFuture); 936 assertThat(future.isCancelled()).isTrue(); 937 } 938 testCancel_stackOverflow()939 public void testCancel_stackOverflow() { 940 SettableFuture<String> orig = SettableFuture.create(); 941 SettableFuture<String> prev = orig; 942 for (int i = 0; i < 100000; i++) { 943 SettableFuture<String> curr = SettableFuture.create(); 944 prev.setFuture(curr); 945 prev = curr; 946 } 947 // orig is the 'outermost future', this should propagate fully down the stack of futures. 948 orig.cancel(true); 949 assertTrue(orig.isCancelled()); 950 assertTrue(prev.isCancelled()); 951 assertTrue(prev.wasInterrupted()); 952 } 953 testSetFutureSelf_cancel()954 public void testSetFutureSelf_cancel() { 955 SettableFuture<String> orig = SettableFuture.create(); 956 orig.setFuture(orig); 957 orig.cancel(true); 958 assertTrue(orig.isCancelled()); 959 } 960 testSetFutureSelf_toString()961 public void testSetFutureSelf_toString() { 962 SettableFuture<String> orig = SettableFuture.create(); 963 orig.setFuture(orig); 964 assertThat(orig.toString()).contains("[status=PENDING, setFuture=[this future]]"); 965 } 966 testSetSelf_toString()967 public void testSetSelf_toString() { 968 SettableFuture<Object> orig = SettableFuture.create(); 969 orig.set(orig); 970 assertThat(orig.toString()).contains("[status=SUCCESS, result=[this future]]"); 971 } 972 testSetFutureSelf_toStringException()973 public void testSetFutureSelf_toStringException() { 974 SettableFuture<String> orig = SettableFuture.create(); 975 orig.setFuture( 976 new AbstractFuture<String>() { 977 @Override 978 public String toString() { 979 throw new NullPointerException(); 980 } 981 }); 982 assertThat(orig.toString()) 983 .contains( 984 "[status=PENDING, setFuture=[Exception thrown from implementation: class" 985 + " java.lang.NullPointerException]]"); 986 } 987 testSetIndirectSelf_toString()988 public void testSetIndirectSelf_toString() { 989 final SettableFuture<Object> orig = SettableFuture.create(); 990 // unlike the above this indirection defeats the trivial cycle detection and causes a SOE 991 orig.setFuture( 992 new ForwardingListenableFuture<Object>() { 993 @Override 994 protected ListenableFuture<Object> delegate() { 995 return orig; 996 } 997 }); 998 assertThat(orig.toString()) 999 .contains("Exception thrown from implementation: class java.lang.StackOverflowError"); 1000 } 1001 1002 // Regression test for a case where we would fail to execute listeners immediately on done futures 1003 // this would be observable from an afterDone callback testListenersExecuteImmediately_fromAfterDone()1004 public void testListenersExecuteImmediately_fromAfterDone() { 1005 AbstractFuture<String> f = 1006 new AbstractFuture<String>() { 1007 @Override 1008 protected void afterDone() { 1009 final AtomicBoolean ranImmediately = new AtomicBoolean(); 1010 addListener( 1011 new Runnable() { 1012 @Override 1013 public void run() { 1014 ranImmediately.set(true); 1015 } 1016 }, 1017 MoreExecutors.directExecutor()); 1018 assertThat(ranImmediately.get()).isTrue(); 1019 } 1020 }; 1021 f.set("foo"); 1022 } 1023 1024 // Regression test for a case where we would fail to execute listeners immediately on done futures 1025 // this would be observable from a waiter that was just unblocked. testListenersExecuteImmediately_afterWaiterWakesUp()1026 public void testListenersExecuteImmediately_afterWaiterWakesUp() throws Exception { 1027 final AbstractFuture<String> f = 1028 new AbstractFuture<String>() { 1029 @Override 1030 protected void afterDone() { 1031 // this simply delays executing listeners 1032 try { 1033 Thread.sleep(TimeUnit.SECONDS.toMillis(10)); 1034 } catch (InterruptedException ignored) { 1035 Thread.currentThread().interrupt(); // preserve status 1036 } 1037 } 1038 }; 1039 Thread t = 1040 new Thread() { 1041 @Override 1042 public void run() { 1043 f.set("foo"); 1044 } 1045 }; 1046 t.start(); 1047 f.get(); 1048 final AtomicBoolean ranImmediately = new AtomicBoolean(); 1049 f.addListener( 1050 new Runnable() { 1051 @Override 1052 public void run() { 1053 ranImmediately.set(true); 1054 } 1055 }, 1056 MoreExecutors.directExecutor()); 1057 assertThat(ranImmediately.get()).isTrue(); 1058 t.interrupt(); 1059 t.join(); 1060 } 1061 testCatchesUndeclaredThrowableFromListener()1062 public void testCatchesUndeclaredThrowableFromListener() { 1063 AbstractFuture<String> f = new AbstractFuture<String>() {}; 1064 f.set("foo"); 1065 f.addListener(() -> sneakyThrow(new SomeCheckedException()), directExecutor()); 1066 } 1067 1068 private static final class SomeCheckedException extends Exception {} 1069 1070 /** Throws an undeclared checked exception. */ sneakyThrow(Throwable t)1071 private static void sneakyThrow(Throwable t) { 1072 class SneakyThrower<T extends Throwable> { 1073 @SuppressWarnings("unchecked") // intentionally unsafe for test 1074 void throwIt(Throwable t) throws T { 1075 throw (T) t; 1076 } 1077 } 1078 new SneakyThrower<Error>().throwIt(t); 1079 } 1080 testTrustedGetFailure_completed()1081 public void testTrustedGetFailure_completed() { 1082 SettableFuture<String> future = SettableFuture.create(); 1083 future.set("261"); 1084 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1085 } 1086 testTrustedGetFailure_failed()1087 public void testTrustedGetFailure_failed() { 1088 SettableFuture<String> future = SettableFuture.create(); 1089 Throwable failure = new Throwable(); 1090 future.setException(failure); 1091 assertThat(future.tryInternalFastPathGetFailure()).isEqualTo(failure); 1092 } 1093 testTrustedGetFailure_notCompleted()1094 public void testTrustedGetFailure_notCompleted() { 1095 SettableFuture<String> future = SettableFuture.create(); 1096 assertThat(future.isDone()).isFalse(); 1097 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1098 } 1099 testTrustedGetFailure_canceledNoCause()1100 public void testTrustedGetFailure_canceledNoCause() { 1101 SettableFuture<String> future = SettableFuture.create(); 1102 future.cancel(false); 1103 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1104 } 1105 testGetFailure_completed()1106 public void testGetFailure_completed() { 1107 AbstractFuture<String> future = new AbstractFuture<String>() {}; 1108 future.set("261"); 1109 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1110 } 1111 testGetFailure_failed()1112 public void testGetFailure_failed() { 1113 AbstractFuture<String> future = new AbstractFuture<String>() {}; 1114 final Throwable failure = new Throwable(); 1115 future.setException(failure); 1116 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1117 } 1118 testGetFailure_notCompleted()1119 public void testGetFailure_notCompleted() { 1120 AbstractFuture<String> future = new AbstractFuture<String>() {}; 1121 assertThat(future.isDone()).isFalse(); 1122 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1123 } 1124 testGetFailure_canceledNoCause()1125 public void testGetFailure_canceledNoCause() { 1126 AbstractFuture<String> future = new AbstractFuture<String>() {}; 1127 future.cancel(false); 1128 assertThat(future.tryInternalFastPathGetFailure()).isNull(); 1129 } 1130 testForwardExceptionFastPath()1131 public void testForwardExceptionFastPath() throws Exception { 1132 class FailFuture extends InternalFutureFailureAccess implements ListenableFuture<String> { 1133 Throwable failure; 1134 1135 FailFuture(Throwable throwable) { 1136 failure = throwable; 1137 } 1138 1139 @Override 1140 public boolean cancel(boolean mayInterruptIfRunning) { 1141 throw new AssertionFailedError("cancel shouldn't be called on this object"); 1142 } 1143 1144 @Override 1145 public boolean isCancelled() { 1146 return false; 1147 } 1148 1149 @Override 1150 public boolean isDone() { 1151 return true; 1152 } 1153 1154 @Override 1155 public String get() throws InterruptedException, ExecutionException { 1156 throw new AssertionFailedError("get() shouldn't be called on this object"); 1157 } 1158 1159 @Override 1160 public String get(long timeout, TimeUnit unit) 1161 throws InterruptedException, ExecutionException, TimeoutException { 1162 return get(); 1163 } 1164 1165 @Override 1166 protected Throwable tryInternalFastPathGetFailure() { 1167 return failure; 1168 } 1169 1170 @Override 1171 public void addListener(Runnable listener, Executor executor) { 1172 throw new AssertionFailedError("addListener() shouldn't be called on this object"); 1173 } 1174 } 1175 1176 final RuntimeException exception = new RuntimeException("you still didn't say the magic word!"); 1177 SettableFuture<String> normalFuture = SettableFuture.create(); 1178 normalFuture.setFuture(new FailFuture(exception)); 1179 assertTrue(normalFuture.isDone()); 1180 ExecutionException e = assertThrows(ExecutionException.class, () -> normalFuture.get()); 1181 assertSame(exception, e.getCause()); 1182 } 1183 awaitUnchecked(final CyclicBarrier barrier)1184 private static void awaitUnchecked(final CyclicBarrier barrier) { 1185 try { 1186 barrier.await(); 1187 } catch (Exception e) { 1188 throw new RuntimeException(e); 1189 } 1190 } 1191 checkStackTrace(ExecutionException e)1192 private void checkStackTrace(ExecutionException e) { 1193 // Our call site for get() should be in the trace. 1194 int index = findStackFrame(e, getClass().getName(), "getExpectingExecutionException"); 1195 1196 assertThat(index).isNotEqualTo(0); 1197 1198 // Above our method should be the call to get(). Don't assert on the class 1199 // because it could be some superclass. 1200 assertThat(e.getStackTrace()[index - 1].getMethodName()).isEqualTo("get"); 1201 } 1202 findStackFrame(ExecutionException e, String clazz, String method)1203 private static int findStackFrame(ExecutionException e, String clazz, String method) { 1204 StackTraceElement[] elements = e.getStackTrace(); 1205 for (int i = 0; i < elements.length; i++) { 1206 StackTraceElement element = elements[i]; 1207 if (element.getClassName().equals(clazz) && element.getMethodName().equals(method)) { 1208 return i; 1209 } 1210 } 1211 throw new AssertionError( 1212 "Expected element " + clazz + "." + method + " not found in stack trace", e); 1213 } 1214 getExpectingExecutionException(AbstractFuture<String> future)1215 private ExecutionException getExpectingExecutionException(AbstractFuture<String> future) 1216 throws InterruptedException { 1217 try { 1218 String got = future.get(); 1219 throw new AssertionError("Expected exception but got " + got); 1220 } catch (ExecutionException e) { 1221 return e; 1222 } 1223 } 1224 1225 private static final class WaiterThread extends Thread { 1226 private final AbstractFuture<?> future; 1227 WaiterThread(AbstractFuture<?> future)1228 private WaiterThread(AbstractFuture<?> future) { 1229 this.future = future; 1230 } 1231 1232 @Override run()1233 public void run() { 1234 try { 1235 future.get(); 1236 } catch (Exception e) { 1237 // nothing 1238 } 1239 } 1240 awaitWaiting()1241 void awaitWaiting() { 1242 while (!isBlocked()) { 1243 if (getState() == State.TERMINATED) { 1244 throw new RuntimeException("Thread exited"); 1245 } 1246 Thread.yield(); 1247 } 1248 } 1249 isBlocked()1250 private boolean isBlocked() { 1251 return getState() == Thread.State.WAITING && LockSupport.getBlocker(this) == future; 1252 } 1253 } 1254 1255 static final class TimedWaiterThread extends Thread { 1256 private final AbstractFuture<?> future; 1257 private final long timeout; 1258 private final TimeUnit unit; 1259 private Exception exception; 1260 private volatile long startTime; 1261 private long timeSpentBlocked; 1262 TimedWaiterThread(AbstractFuture<?> future, long timeout, TimeUnit unit)1263 TimedWaiterThread(AbstractFuture<?> future, long timeout, TimeUnit unit) { 1264 this.future = future; 1265 this.timeout = timeout; 1266 this.unit = unit; 1267 } 1268 1269 @Override run()1270 public void run() { 1271 startTime = System.nanoTime(); 1272 try { 1273 future.get(timeout, unit); 1274 } catch (Exception e) { 1275 // nothing 1276 exception = e; 1277 } finally { 1278 timeSpentBlocked = System.nanoTime() - startTime; 1279 } 1280 } 1281 awaitWaiting()1282 void awaitWaiting() { 1283 while (!isBlocked()) { 1284 if (getState() == State.TERMINATED) { 1285 throw new RuntimeException("Thread exited"); 1286 } 1287 Thread.yield(); 1288 } 1289 } 1290 isBlocked()1291 private boolean isBlocked() { 1292 return getState() == Thread.State.TIMED_WAITING && LockSupport.getBlocker(this) == future; 1293 } 1294 } 1295 1296 private static final class PollingThread extends Thread { 1297 private final AbstractFuture<?> future; 1298 private final CountDownLatch completedIteration = new CountDownLatch(10); 1299 PollingThread(AbstractFuture<?> future)1300 private PollingThread(AbstractFuture<?> future) { 1301 this.future = future; 1302 } 1303 1304 @Override run()1305 public void run() { 1306 while (true) { 1307 try { 1308 future.get(0, TimeUnit.SECONDS); 1309 return; 1310 } catch (InterruptedException | ExecutionException e) { 1311 return; 1312 } catch (TimeoutException e) { 1313 // do nothing 1314 } finally { 1315 completedIteration.countDown(); 1316 } 1317 } 1318 } 1319 awaitInLoop()1320 void awaitInLoop() { 1321 Uninterruptibles.awaitUninterruptibly(completedIteration); 1322 } 1323 } 1324 1325 private static final class InterruptibleFuture extends AbstractFuture<String> { 1326 boolean interruptTaskWasCalled; 1327 1328 @Override interruptTask()1329 protected void interruptTask() { 1330 assertFalse(interruptTaskWasCalled); 1331 interruptTaskWasCalled = true; 1332 } 1333 } 1334 isWindows()1335 private static boolean isWindows() { 1336 return OS_NAME.value().startsWith("Windows"); 1337 } 1338 } 1339