/* * Copyright (C) 2011 The Guava Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.google.common.util.concurrent; import static com.google.common.base.StandardSystemProperty.JAVA_SPECIFICATION_VERSION; import static com.google.common.base.StandardSystemProperty.OS_NAME; import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static org.junit.Assert.assertThrows; import com.google.common.annotations.GwtIncompatible; import com.google.common.collect.Iterables; import com.google.common.collect.Range; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.internal.InternalFutureFailureAccess; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import junit.framework.AssertionFailedError; import junit.framework.TestCase; import org.checkerframework.checker.nullness.qual.Nullable; /** * Tests for {@link AbstractFuture}. * * @author Brian Stoler */ public class AbstractFutureTest extends TestCase { public void testSuccess() throws ExecutionException, InterruptedException { final Object value = new Object(); assertSame( value, new AbstractFuture() { { set(value); } }.get()); } public void testException() throws InterruptedException { final Throwable failure = new Throwable(); AbstractFuture future = new AbstractFuture() { { setException(failure); } }; ExecutionException ee1 = getExpectingExecutionException(future); ExecutionException ee2 = getExpectingExecutionException(future); // Ensure we get a unique execution exception on each get assertNotSame(ee1, ee2); assertThat(ee1).hasCauseThat().isSameInstanceAs(failure); assertThat(ee2).hasCauseThat().isSameInstanceAs(failure); checkStackTrace(ee1); checkStackTrace(ee2); } public void testCancel_notDoneNoInterrupt() throws Exception { InterruptibleFuture future = new InterruptibleFuture(); assertTrue(future.cancel(false)); assertTrue(future.isCancelled()); assertTrue(future.isDone()); assertFalse(future.wasInterrupted()); assertFalse(future.interruptTaskWasCalled); CancellationException e = assertThrows(CancellationException.class, () -> future.get()); assertThat(e).hasCauseThat().isNull(); } public void testCancel_notDoneInterrupt() throws Exception { InterruptibleFuture future = new InterruptibleFuture(); assertTrue(future.cancel(true)); assertTrue(future.isCancelled()); assertTrue(future.isDone()); assertTrue(future.wasInterrupted()); assertTrue(future.interruptTaskWasCalled); CancellationException e = assertThrows(CancellationException.class, () -> future.get()); assertThat(e).hasCauseThat().isNull(); } public void testCancel_done() throws Exception { AbstractFuture future = new AbstractFuture() { { set("foo"); } }; assertFalse(future.cancel(true)); assertFalse(future.isCancelled()); assertTrue(future.isDone()); } public void testGetWithTimeoutDoneFuture() throws Exception { AbstractFuture future = new AbstractFuture() { { set("foo"); } }; assertEquals("foo", future.get(0, TimeUnit.SECONDS)); } public void testEvilFuture_setFuture() throws Exception { final RuntimeException exception = new RuntimeException("you didn't say the magic word!"); AbstractFuture evilFuture = new AbstractFuture() { @Override public void addListener(Runnable r, Executor e) { throw exception; } }; AbstractFuture normalFuture = new AbstractFuture() {}; normalFuture.setFuture(evilFuture); assertTrue(normalFuture.isDone()); ExecutionException e = assertThrows(ExecutionException.class, () -> normalFuture.get()); assertThat(e).hasCauseThat().isSameInstanceAs(exception); } public void testRemoveWaiter_interruption() throws Exception { final AbstractFuture future = new AbstractFuture() {}; WaiterThread waiter1 = new WaiterThread(future); waiter1.start(); waiter1.awaitWaiting(); WaiterThread waiter2 = new WaiterThread(future); waiter2.start(); waiter2.awaitWaiting(); // The waiter queue should be waiter2->waiter1 // This should wake up waiter1 and cause the waiter1 node to be removed. waiter1.interrupt(); waiter1.join(); waiter2.awaitWaiting(); // should still be blocked LockSupport.unpark(waiter2); // spurious wakeup waiter2.awaitWaiting(); // should eventually re-park future.set(null); waiter2.join(); } public void testRemoveWaiter_polling() throws Exception { final AbstractFuture future = new AbstractFuture() {}; WaiterThread waiter = new WaiterThread(future); waiter.start(); waiter.awaitWaiting(); PollingThread poller = new PollingThread(future); poller.start(); PollingThread poller2 = new PollingThread(future); poller2.start(); PollingThread poller3 = new PollingThread(future); poller3.start(); poller.awaitInLoop(); poller2.awaitInLoop(); poller3.awaitInLoop(); // The waiter queue should be {poller x 3}->waiter1 waiter.interrupt(); // This should wake up waiter1 and cause the waiter1 node to be removed. waiter.join(); future.set(null); poller.join(); } public void testToString_allUnique() throws Exception { // Two futures should not have the same toString, to avoid people asserting on it assertThat(SettableFuture.create().toString()).isNotEqualTo(SettableFuture.create().toString()); } public void testToString_oom() throws Exception { SettableFuture future = SettableFuture.create(); future.set( new Object() { @Override public String toString() { throw new OutOfMemoryError(); } @Override public int hashCode() { throw new OutOfMemoryError(); } }); String unused = future.toString(); SettableFuture future2 = SettableFuture.create(); // A more organic OOM from a toString implementation Object object = new Object() { @Override public String toString() { return new String(new char[50_000]); } }; List list = Collections.singletonList(object); for (int i = 0; i < 10; i++) { Object[] array = new Object[500]; Arrays.fill(array, list); list = Arrays.asList(array); } future2.set(list); unused = future.toString(); } public void testToString_notDone() throws Exception { AbstractFuture testFuture = new AbstractFuture() { @Override public String pendingToString() { return "cause=[Because this test isn't done]"; } }; assertThat(testFuture.toString()) .matches( "[^\\[]+\\[status=PENDING, info=\\[cause=\\[Because this test isn't done\\]\\]\\]"); TimeoutException e = assertThrows(TimeoutException.class, () -> testFuture.get(1, TimeUnit.NANOSECONDS)); assertThat(e.getMessage()).contains("1 nanoseconds"); assertThat(e.getMessage()).contains("Because this test isn't done"); } public void testToString_completesDuringToString() throws Exception { AbstractFuture testFuture = new AbstractFuture() { @Override public String pendingToString() { // Complete ourselves during the toString calculation this.set(true); return "cause=[Because this test isn't done]"; } }; assertThat(testFuture.toString()) .matches("[^\\[]+\\[status=SUCCESS, result=\\[java.lang.Boolean@\\w+\\]\\]"); } /** * This test attempts to cause a future to wait for longer than it was requested to from a timed * get() call. As measurements of time are prone to flakiness, it tries to assert based on ranges * derived from observing how much time actually passed for various operations. */ @SuppressWarnings({"DeprecatedThreadMethods", "ThreadPriorityCheck"}) @AndroidIncompatible // Thread.suspend public void testToString_delayedTimeout() throws Exception { Integer javaVersion = Ints.tryParse(JAVA_SPECIFICATION_VERSION.value()); // Parsing to an integer might fail because Java 8 returns "1.8" instead of "8." // We can continue if it's 1.8, and we can continue if it's an integer in [9, 20). if (javaVersion != null && javaVersion >= 20) { // TODO(b/261217224): Make this test work under newer JDKs. return; } TimedWaiterThread thread = new TimedWaiterThread(new AbstractFuture() {}, 2, TimeUnit.SECONDS); thread.start(); thread.awaitWaiting(); thread.suspend(); // Sleep for enough time to add 1500 milliseconds of overwait to the get() call. long toWaitMillis = 3500 - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - thread.startTime); Thread.sleep(toWaitMillis); thread.setPriority(Thread.MAX_PRIORITY); thread.resume(); thread.join(); // It's possible to race and suspend the thread just before the park call actually takes effect, // causing the thread to be suspended for 3.5 seconds, and then park itself for 2 seconds after // being resumed. To avoid a flake in this scenario, calculate how long that thread actually // waited and assert based on that time. Empirically, the race where the thread ends up waiting // for 5.5 seconds happens about 2% of the time. boolean longWait = TimeUnit.NANOSECONDS.toSeconds(thread.timeSpentBlocked) >= 5; // Count how long it actually took to return; we'll accept any number between the expected delay // and the approximate actual delay, to be robust to variance in thread scheduling. char overWaitNanosFirstDigit = Long.toString( thread.timeSpentBlocked - TimeUnit.MILLISECONDS.toNanos(longWait ? 5000 : 3000)) .charAt(0); if (overWaitNanosFirstDigit < '4') { overWaitNanosFirstDigit = '9'; } String nanosRegex = "[4-" + overWaitNanosFirstDigit + "][0-9]+"; assertWithMessage( "Spent " + thread.timeSpentBlocked + " ns blocked; slept for " + toWaitMillis + " ms") .that(thread.exception) .hasMessageThat() .matches( "Waited 2 seconds \\(plus " + (longWait ? "3" : "1") + " seconds, " + nanosRegex + " nanoseconds delay\\).*"); } public void testToString_completed() throws Exception { AbstractFuture testFuture2 = new AbstractFuture() { @Override public String pendingToString() { return "cause=[Someday...]"; } }; AbstractFuture testFuture3 = new AbstractFuture() {}; testFuture3.setFuture(testFuture2); assertThat(testFuture3.toString()) .matches( "[^\\[]+\\[status=PENDING, setFuture=\\[[^\\[]+\\[status=PENDING," + " info=\\[cause=\\[Someday...]]]]]"); testFuture2.set("result string"); assertThat(testFuture3.toString()) .matches("[^\\[]+\\[status=SUCCESS, result=\\[java.lang.String@\\w+\\]\\]"); } public void testToString_cancelled() throws Exception { assertThat(Futures.immediateCancelledFuture().toString()) .matches("[^\\[]+\\[status=CANCELLED\\]"); } public void testToString_failed() { assertThat(Futures.immediateFailedFuture(new RuntimeException("foo")).toString()) .matches("[^\\[]+\\[status=FAILURE, cause=\\[java.lang.RuntimeException: foo\\]\\]"); } public void testToString_misbehaving() throws Exception { assertThat( new AbstractFuture() { @Override public String pendingToString() { throw new RuntimeException("I'm a misbehaving implementation"); } }.toString()) .matches( "[^\\[]+\\[status=PENDING, info=\\[Exception thrown from implementation: " + "class java.lang.RuntimeException\\]\\]"); } public void testCompletionFinishesWithDone() { ExecutorService executor = Executors.newFixedThreadPool(10); for (int i = 0; i < 50000; i++) { final AbstractFuture future = new AbstractFuture() {}; final AtomicReference errorMessage = Atomics.newReference(); executor.execute( new Runnable() { @Override public void run() { future.set("success"); if (!future.isDone()) { errorMessage.set("Set call exited before future was complete."); } } }); executor.execute( new Runnable() { @Override public void run() { future.setException(new IllegalArgumentException("failure")); if (!future.isDone()) { errorMessage.set("SetException call exited before future was complete."); } } }); executor.execute( new Runnable() { @Override public void run() { future.cancel(true); if (!future.isDone()) { errorMessage.set("Cancel call exited before future was complete."); } } }); try { future.get(); } catch (Throwable t) { // Ignore, we just wanted to block. } String error = errorMessage.get(); assertNull(error, error); } executor.shutdown(); } /** * He did the bash, he did the future bash The future bash, it was a concurrency smash He did the * bash, it caught on in a flash He did the bash, he did the future bash */ public void testFutureBash() { if (isWindows()) { return; // TODO: b/136041958 - Running very slowly on Windows CI. } final CyclicBarrier barrier = new CyclicBarrier( 6 // for the setter threads + 50 // for the listeners + 50 // for the blocking get threads, + 1); // for the main thread final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties()); final AtomicReference> currentFuture = Atomics.newReference(); final AtomicInteger numSuccessfulSetCalls = new AtomicInteger(); Callable<@Nullable Void> completeSuccessfullyRunnable = new Callable<@Nullable Void>() { @Override public @Nullable Void call() { if (currentFuture.get().set("set")) { numSuccessfulSetCalls.incrementAndGet(); } awaitUnchecked(barrier); return null; } }; Callable<@Nullable Void> completeExceptionallyRunnable = new Callable<@Nullable Void>() { Exception failureCause = new Exception("setException"); @Override public @Nullable Void call() { if (currentFuture.get().setException(failureCause)) { numSuccessfulSetCalls.incrementAndGet(); } awaitUnchecked(barrier); return null; } }; Callable<@Nullable Void> cancelRunnable = new Callable<@Nullable Void>() { @Override public @Nullable Void call() { if (currentFuture.get().cancel(true)) { numSuccessfulSetCalls.incrementAndGet(); } awaitUnchecked(barrier); return null; } }; Callable<@Nullable Void> setFutureCompleteSuccessfullyRunnable = new Callable<@Nullable Void>() { ListenableFuture future = Futures.immediateFuture("setFuture"); @Override public @Nullable Void call() { if (currentFuture.get().setFuture(future)) { numSuccessfulSetCalls.incrementAndGet(); } awaitUnchecked(barrier); return null; } }; Callable<@Nullable Void> setFutureCompleteExceptionallyRunnable = new Callable<@Nullable Void>() { ListenableFuture future = Futures.immediateFailedFuture(new Exception("setFuture")); @Override public @Nullable Void call() { if (currentFuture.get().setFuture(future)) { numSuccessfulSetCalls.incrementAndGet(); } awaitUnchecked(barrier); return null; } }; Callable<@Nullable Void> setFutureCancelRunnable = new Callable<@Nullable Void>() { ListenableFuture future = Futures.immediateCancelledFuture(); @Override public @Nullable Void call() { if (currentFuture.get().setFuture(future)) { numSuccessfulSetCalls.incrementAndGet(); } awaitUnchecked(barrier); return null; } }; final Set finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet()); Runnable collectResultsRunnable = new Runnable() { @Override public void run() { try { String result = Uninterruptibles.getUninterruptibly(currentFuture.get()); finalResults.add(result); } catch (ExecutionException e) { finalResults.add(e.getCause()); } catch (CancellationException e) { finalResults.add(CancellationException.class); } finally { awaitUnchecked(barrier); } } }; Runnable collectResultsTimedGetRunnable = new Runnable() { @Override public void run() { Future future = currentFuture.get(); while (true) { try { String result = Uninterruptibles.getUninterruptibly(future, 0, TimeUnit.SECONDS); finalResults.add(result); break; } catch (ExecutionException e) { finalResults.add(e.getCause()); break; } catch (CancellationException e) { finalResults.add(CancellationException.class); break; } catch (TimeoutException e) { // loop } } awaitUnchecked(barrier); } }; List> allTasks = new ArrayList<>(); allTasks.add(completeSuccessfullyRunnable); allTasks.add(completeExceptionallyRunnable); allTasks.add(cancelRunnable); allTasks.add(setFutureCompleteSuccessfullyRunnable); allTasks.add(setFutureCompleteExceptionallyRunnable); allTasks.add(setFutureCancelRunnable); for (int k = 0; k < 50; k++) { // For each listener we add a task that submits it to the executor directly for the blocking // get use case and another task that adds it as a listener to the future to exercise both // racing addListener calls and addListener calls completing after the future completes. final Runnable listener = k % 2 == 0 ? collectResultsRunnable : collectResultsTimedGetRunnable; allTasks.add(Executors.callable(listener)); allTasks.add( new Callable<@Nullable Void>() { @Override public @Nullable Void call() throws Exception { currentFuture.get().addListener(listener, executor); return null; } }); } assertEquals(allTasks.size() + 1, barrier.getParties()); for (int i = 0; i < 1000; i++) { Collections.shuffle(allTasks); final AbstractFuture future = new AbstractFuture() {}; currentFuture.set(future); for (Callable task : allTasks) { @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored Future possiblyIgnoredError = executor.submit(task); } awaitUnchecked(barrier); assertThat(future.isDone()).isTrue(); // inspect state and ensure it is correct! // asserts that all get calling threads received the same value Object result = Iterables.getOnlyElement(finalResults); if (result == CancellationException.class) { assertTrue(future.isCancelled()); if (future.wasInterrupted()) { // We were cancelled, it is possible that setFuture could have succeeded too. assertThat(numSuccessfulSetCalls.get()).isIn(Range.closed(1, 2)); } else { assertThat(numSuccessfulSetCalls.get()).isEqualTo(1); } } else { assertThat(numSuccessfulSetCalls.get()).isEqualTo(1); } // reset for next iteration numSuccessfulSetCalls.set(0); finalResults.clear(); } executor.shutdown(); } // setFuture and cancel() interact in more complicated ways than the other setters. public void testSetFutureCancelBash() { if (isWindows()) { return; // TODO: b/136041958 - Running very slowly on Windows CI. } final int size = 50; final CyclicBarrier barrier = new CyclicBarrier( 2 // for the setter threads + size // for the listeners + size // for the get threads, + 1); // for the main thread final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties()); final AtomicReference> currentFuture = Atomics.newReference(); final AtomicReference> setFutureFuture = Atomics.newReference(); final AtomicBoolean setFutureSetSuccess = new AtomicBoolean(); final AtomicBoolean setFutureCompletionSuccess = new AtomicBoolean(); final AtomicBoolean cancellationSuccess = new AtomicBoolean(); Runnable cancelRunnable = new Runnable() { @Override public void run() { cancellationSuccess.set(currentFuture.get().cancel(true)); awaitUnchecked(barrier); } }; Runnable setFutureCompleteSuccessfullyRunnable = new Runnable() { @Override public void run() { AbstractFuture future = setFutureFuture.get(); setFutureSetSuccess.set(currentFuture.get().setFuture(future)); setFutureCompletionSuccess.set(future.set("hello-async-world")); awaitUnchecked(barrier); } }; final Set finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet()); Runnable collectResultsRunnable = new Runnable() { @Override public void run() { try { String result = Uninterruptibles.getUninterruptibly(currentFuture.get()); finalResults.add(result); } catch (ExecutionException e) { finalResults.add(e.getCause()); } catch (CancellationException e) { finalResults.add(CancellationException.class); } finally { awaitUnchecked(barrier); } } }; Runnable collectResultsTimedGetRunnable = new Runnable() { @Override public void run() { Future future = currentFuture.get(); while (true) { try { String result = Uninterruptibles.getUninterruptibly(future, 0, TimeUnit.SECONDS); finalResults.add(result); break; } catch (ExecutionException e) { finalResults.add(e.getCause()); break; } catch (CancellationException e) { finalResults.add(CancellationException.class); break; } catch (TimeoutException e) { // loop } } awaitUnchecked(barrier); } }; List allTasks = new ArrayList<>(); allTasks.add(cancelRunnable); allTasks.add(setFutureCompleteSuccessfullyRunnable); for (int k = 0; k < size; k++) { // For each listener we add a task that submits it to the executor directly for the blocking // get use case and another task that adds it as a listener to the future to exercise both // racing addListener calls and addListener calls completing after the future completes. final Runnable listener = k % 2 == 0 ? collectResultsRunnable : collectResultsTimedGetRunnable; allTasks.add(listener); allTasks.add( new Runnable() { @Override public void run() { currentFuture.get().addListener(listener, executor); } }); } assertEquals(allTasks.size() + 1, barrier.getParties()); // sanity check for (int i = 0; i < 1000; i++) { Collections.shuffle(allTasks); final AbstractFuture future = new AbstractFuture() {}; final AbstractFuture setFuture = new AbstractFuture() {}; currentFuture.set(future); setFutureFuture.set(setFuture); for (Runnable task : allTasks) { executor.execute(task); } awaitUnchecked(barrier); assertThat(future.isDone()).isTrue(); // inspect state and ensure it is correct! // asserts that all get calling threads received the same value Object result = Iterables.getOnlyElement(finalResults); if (result == CancellationException.class) { assertTrue(future.isCancelled()); assertTrue(cancellationSuccess.get()); // cancellation can interleave in 3 ways // 1. prior to setFuture // 2. after setFuture before set() on the future assigned // 3. after setFuture and set() are called but before the listener completes. if (!setFutureSetSuccess.get() || !setFutureCompletionSuccess.get()) { // If setFuture fails or set on the future fails then it must be because that future was // cancelled assertTrue(setFuture.isCancelled()); assertTrue(setFuture.wasInterrupted()); // we only call cancel(true) } } else { // set on the future completed assertFalse(cancellationSuccess.get()); assertTrue(setFutureSetSuccess.get()); assertTrue(setFutureCompletionSuccess.get()); } // reset for next iteration setFutureSetSuccess.set(false); setFutureCompletionSuccess.set(false); cancellationSuccess.set(false); finalResults.clear(); } executor.shutdown(); } // Test to ensure that when calling setFuture with a done future only setFuture or cancel can // return true. public void testSetFutureCancelBash_withDoneFuture() { final CyclicBarrier barrier = new CyclicBarrier( 2 // for the setter threads + 1 // for the blocking get thread, + 1); // for the main thread final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties()); final AtomicReference> currentFuture = Atomics.newReference(); final AtomicBoolean setFutureSuccess = new AtomicBoolean(); final AtomicBoolean cancellationSuccess = new AtomicBoolean(); Callable<@Nullable Void> cancelRunnable = new Callable<@Nullable Void>() { @Override public @Nullable Void call() { cancellationSuccess.set(currentFuture.get().cancel(true)); awaitUnchecked(barrier); return null; } }; Callable<@Nullable Void> setFutureCompleteSuccessfullyRunnable = new Callable<@Nullable Void>() { final ListenableFuture future = Futures.immediateFuture("hello"); @Override public @Nullable Void call() { setFutureSuccess.set(currentFuture.get().setFuture(future)); awaitUnchecked(barrier); return null; } }; final Set finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet()); final Runnable collectResultsRunnable = new Runnable() { @Override public void run() { try { String result = Uninterruptibles.getUninterruptibly(currentFuture.get()); finalResults.add(result); } catch (ExecutionException e) { finalResults.add(e.getCause()); } catch (CancellationException e) { finalResults.add(CancellationException.class); } finally { awaitUnchecked(barrier); } } }; List> allTasks = new ArrayList<>(); allTasks.add(cancelRunnable); allTasks.add(setFutureCompleteSuccessfullyRunnable); allTasks.add(Executors.callable(collectResultsRunnable)); assertEquals(allTasks.size() + 1, barrier.getParties()); // sanity check for (int i = 0; i < 1000; i++) { Collections.shuffle(allTasks); final AbstractFuture future = new AbstractFuture() {}; currentFuture.set(future); for (Callable task : allTasks) { @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored Future possiblyIgnoredError = executor.submit(task); } awaitUnchecked(barrier); assertThat(future.isDone()).isTrue(); // inspect state and ensure it is correct! // asserts that all get calling threads received the same value Object result = Iterables.getOnlyElement(finalResults); if (result == CancellationException.class) { assertTrue(future.isCancelled()); assertTrue(cancellationSuccess.get()); assertFalse(setFutureSuccess.get()); } else { assertTrue(setFutureSuccess.get()); assertFalse(cancellationSuccess.get()); } // reset for next iteration setFutureSuccess.set(false); cancellationSuccess.set(false); finalResults.clear(); } executor.shutdown(); } // In a previous implementation this would cause a stack overflow after ~2000 futures chained // together. Now it should only be limited by available memory (and time) public void testSetFuture_stackOverflow() { SettableFuture orig = SettableFuture.create(); SettableFuture prev = orig; for (int i = 0; i < 100000; i++) { SettableFuture curr = SettableFuture.create(); prev.setFuture(curr); prev = curr; } // prev represents the 'innermost' future prev.set("done"); assertTrue(orig.isDone()); } // Verify that StackOverflowError in a long chain of SetFuture doesn't cause the entire toString // call to fail @GwtIncompatible @AndroidIncompatible public void testSetFutureToString_stackOverflow() { SettableFuture orig = SettableFuture.create(); SettableFuture prev = orig; for (int i = 0; i < 100000; i++) { SettableFuture curr = SettableFuture.create(); prev.setFuture(curr); prev = curr; } // orig represents the 'outermost' future assertThat(orig.toString()) .contains("Exception thrown from implementation: class java.lang.StackOverflowError"); } public void testSetFuture_misbehavingFutureThrows() throws Exception { SettableFuture future = SettableFuture.create(); ListenableFuture badFuture = new ListenableFuture() { @Override public boolean cancel(boolean interrupt) { return false; } @Override public boolean isDone() { return true; } @Override public boolean isCancelled() { return false; // BAD!! } @Override public String get() { throw new CancellationException(); // BAD!! } @Override public String get(long time, TimeUnit unit) { throw new CancellationException(); // BAD!! } @Override public void addListener(Runnable runnable, Executor executor) { executor.execute(runnable); } }; future.setFuture(badFuture); ExecutionException expected = getExpectingExecutionException(future); assertThat(expected).hasCauseThat().isInstanceOf(IllegalArgumentException.class); assertThat(expected).hasCauseThat().hasMessageThat().contains(badFuture.toString()); } public void testSetFuture_misbehavingFutureDoesNotThrow() throws Exception { SettableFuture future = SettableFuture.create(); ListenableFuture badFuture = new ListenableFuture() { @Override public boolean cancel(boolean interrupt) { return false; } @Override public boolean isDone() { return true; } @Override public boolean isCancelled() { return true; // BAD!! } @Override public String get() { return "foo"; // BAD!! } @Override public String get(long time, TimeUnit unit) { return "foo"; // BAD!! } @Override public void addListener(Runnable runnable, Executor executor) { executor.execute(runnable); } }; future.setFuture(badFuture); assertThat(future.isCancelled()).isTrue(); } public void testCancel_stackOverflow() { SettableFuture orig = SettableFuture.create(); SettableFuture prev = orig; for (int i = 0; i < 100000; i++) { SettableFuture curr = SettableFuture.create(); prev.setFuture(curr); prev = curr; } // orig is the 'outermost future', this should propagate fully down the stack of futures. orig.cancel(true); assertTrue(orig.isCancelled()); assertTrue(prev.isCancelled()); assertTrue(prev.wasInterrupted()); } public void testSetFutureSelf_cancel() { SettableFuture orig = SettableFuture.create(); orig.setFuture(orig); orig.cancel(true); assertTrue(orig.isCancelled()); } public void testSetFutureSelf_toString() { SettableFuture orig = SettableFuture.create(); orig.setFuture(orig); assertThat(orig.toString()).contains("[status=PENDING, setFuture=[this future]]"); } public void testSetSelf_toString() { SettableFuture orig = SettableFuture.create(); orig.set(orig); assertThat(orig.toString()).contains("[status=SUCCESS, result=[this future]]"); } public void testSetFutureSelf_toStringException() { SettableFuture orig = SettableFuture.create(); orig.setFuture( new AbstractFuture() { @Override public String toString() { throw new NullPointerException(); } }); assertThat(orig.toString()) .contains( "[status=PENDING, setFuture=[Exception thrown from implementation: class" + " java.lang.NullPointerException]]"); } public void testSetIndirectSelf_toString() { final SettableFuture orig = SettableFuture.create(); // unlike the above this indirection defeats the trivial cycle detection and causes a SOE orig.setFuture( new ForwardingListenableFuture() { @Override protected ListenableFuture delegate() { return orig; } }); assertThat(orig.toString()) .contains("Exception thrown from implementation: class java.lang.StackOverflowError"); } // Regression test for a case where we would fail to execute listeners immediately on done futures // this would be observable from an afterDone callback public void testListenersExecuteImmediately_fromAfterDone() { AbstractFuture f = new AbstractFuture() { @Override protected void afterDone() { final AtomicBoolean ranImmediately = new AtomicBoolean(); addListener( new Runnable() { @Override public void run() { ranImmediately.set(true); } }, MoreExecutors.directExecutor()); assertThat(ranImmediately.get()).isTrue(); } }; f.set("foo"); } // Regression test for a case where we would fail to execute listeners immediately on done futures // this would be observable from a waiter that was just unblocked. public void testListenersExecuteImmediately_afterWaiterWakesUp() throws Exception { final AbstractFuture f = new AbstractFuture() { @Override protected void afterDone() { // this simply delays executing listeners try { Thread.sleep(TimeUnit.SECONDS.toMillis(10)); } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); // preserve status } } }; Thread t = new Thread() { @Override public void run() { f.set("foo"); } }; t.start(); f.get(); final AtomicBoolean ranImmediately = new AtomicBoolean(); f.addListener( new Runnable() { @Override public void run() { ranImmediately.set(true); } }, MoreExecutors.directExecutor()); assertThat(ranImmediately.get()).isTrue(); t.interrupt(); t.join(); } public void testCatchesUndeclaredThrowableFromListener() { AbstractFuture f = new AbstractFuture() {}; f.set("foo"); f.addListener(() -> sneakyThrow(new SomeCheckedException()), directExecutor()); } private static final class SomeCheckedException extends Exception {} /** Throws an undeclared checked exception. */ private static void sneakyThrow(Throwable t) { class SneakyThrower { @SuppressWarnings("unchecked") // intentionally unsafe for test void throwIt(Throwable t) throws T { throw (T) t; } } new SneakyThrower().throwIt(t); } public void testTrustedGetFailure_Completed() { SettableFuture future = SettableFuture.create(); future.set("261"); assertThat(future.tryInternalFastPathGetFailure()).isNull(); } public void testTrustedGetFailure_Failed() { SettableFuture future = SettableFuture.create(); Throwable failure = new Throwable(); future.setException(failure); assertThat(future.tryInternalFastPathGetFailure()).isEqualTo(failure); } public void testTrustedGetFailure_NotCompleted() { SettableFuture future = SettableFuture.create(); assertThat(future.isDone()).isFalse(); assertThat(future.tryInternalFastPathGetFailure()).isNull(); } public void testTrustedGetFailure_CanceledNoCause() { SettableFuture future = SettableFuture.create(); future.cancel(false); assertThat(future.tryInternalFastPathGetFailure()).isNull(); } public void testGetFailure_Completed() { AbstractFuture future = new AbstractFuture() {}; future.set("261"); assertThat(future.tryInternalFastPathGetFailure()).isNull(); } public void testGetFailure_Failed() { AbstractFuture future = new AbstractFuture() {}; final Throwable failure = new Throwable(); future.setException(failure); assertThat(future.tryInternalFastPathGetFailure()).isNull(); } public void testGetFailure_NotCompleted() { AbstractFuture future = new AbstractFuture() {}; assertThat(future.isDone()).isFalse(); assertThat(future.tryInternalFastPathGetFailure()).isNull(); } public void testGetFailure_CanceledNoCause() { AbstractFuture future = new AbstractFuture() {}; future.cancel(false); assertThat(future.tryInternalFastPathGetFailure()).isNull(); } public void testForwardExceptionFastPath() throws Exception { class FailFuture extends InternalFutureFailureAccess implements ListenableFuture { Throwable failure; FailFuture(Throwable throwable) { failure = throwable; } @Override public boolean cancel(boolean mayInterruptIfRunning) { throw new AssertionFailedError("cancel shouldn't be called on this object"); } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { return true; } @Override public String get() throws InterruptedException, ExecutionException { throw new AssertionFailedError("get() shouldn't be called on this object"); } @Override public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return get(); } @Override protected Throwable tryInternalFastPathGetFailure() { return failure; } @Override public void addListener(Runnable listener, Executor executor) { throw new AssertionFailedError("addListener() shouldn't be called on this object"); } } final RuntimeException exception = new RuntimeException("you still didn't say the magic word!"); SettableFuture normalFuture = SettableFuture.create(); normalFuture.setFuture(new FailFuture(exception)); assertTrue(normalFuture.isDone()); ExecutionException e = assertThrows(ExecutionException.class, () -> normalFuture.get()); assertSame(exception, e.getCause()); } private static void awaitUnchecked(final CyclicBarrier barrier) { try { barrier.await(); } catch (Exception e) { throw new RuntimeException(e); } } private void checkStackTrace(ExecutionException e) { // Our call site for get() should be in the trace. int index = findStackFrame(e, getClass().getName(), "getExpectingExecutionException"); assertThat(index).isNotEqualTo(0); // Above our method should be the call to get(). Don't assert on the class // because it could be some superclass. assertThat(e.getStackTrace()[index - 1].getMethodName()).isEqualTo("get"); } private static int findStackFrame(ExecutionException e, String clazz, String method) { StackTraceElement[] elements = e.getStackTrace(); for (int i = 0; i < elements.length; i++) { StackTraceElement element = elements[i]; if (element.getClassName().equals(clazz) && element.getMethodName().equals(method)) { return i; } } AssertionFailedError failure = new AssertionFailedError( "Expected element " + clazz + "." + method + " not found in stack trace"); failure.initCause(e); throw failure; } private ExecutionException getExpectingExecutionException(AbstractFuture future) throws InterruptedException { try { String got = future.get(); throw new AssertionError("Expected exception but got " + got); } catch (ExecutionException e) { return e; } } private static final class WaiterThread extends Thread { private final AbstractFuture future; private WaiterThread(AbstractFuture future) { this.future = future; } @Override public void run() { try { future.get(); } catch (Exception e) { // nothing } } void awaitWaiting() { while (!isBlocked()) { if (getState() == State.TERMINATED) { throw new RuntimeException("Thread exited"); } Thread.yield(); } } private boolean isBlocked() { return getState() == Thread.State.WAITING && LockSupport.getBlocker(this) == future; } } static final class TimedWaiterThread extends Thread { private final AbstractFuture future; private final long timeout; private final TimeUnit unit; private Exception exception; private volatile long startTime; private long timeSpentBlocked; TimedWaiterThread(AbstractFuture future, long timeout, TimeUnit unit) { this.future = future; this.timeout = timeout; this.unit = unit; } @Override public void run() { startTime = System.nanoTime(); try { future.get(timeout, unit); } catch (Exception e) { // nothing exception = e; } finally { timeSpentBlocked = System.nanoTime() - startTime; } } void awaitWaiting() { while (!isBlocked()) { if (getState() == State.TERMINATED) { throw new RuntimeException("Thread exited"); } Thread.yield(); } } private boolean isBlocked() { return getState() == Thread.State.TIMED_WAITING && LockSupport.getBlocker(this) == future; } } private static final class PollingThread extends Thread { private final AbstractFuture future; private final CountDownLatch completedIteration = new CountDownLatch(10); private PollingThread(AbstractFuture future) { this.future = future; } @Override public void run() { while (true) { try { future.get(0, TimeUnit.SECONDS); return; } catch (InterruptedException | ExecutionException e) { return; } catch (TimeoutException e) { // do nothing } finally { completedIteration.countDown(); } } } void awaitInLoop() { Uninterruptibles.awaitUninterruptibly(completedIteration); } } private static final class InterruptibleFuture extends AbstractFuture { boolean interruptTaskWasCalled; @Override protected void interruptTask() { assertFalse(interruptTaskWasCalled); interruptTaskWasCalled = true; } } private static boolean isWindows() { return OS_NAME.value().startsWith("Windows"); } }