• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static com.google.common.truth.Truth.assertWithMessage;
21 
22 import com.google.common.collect.Iterables;
23 import com.google.common.collect.Range;
24 import com.google.common.collect.Sets;
25 import com.google.common.util.concurrent.internal.InternalFutureFailureAccess;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Set;
30 import java.util.concurrent.Callable;
31 import java.util.concurrent.CancellationException;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.CyclicBarrier;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.TimeoutException;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.concurrent.atomic.AtomicReference;
44 import java.util.concurrent.locks.LockSupport;
45 import junit.framework.AssertionFailedError;
46 import junit.framework.TestCase;
47 
48 /**
49  * Tests for {@link AbstractFuture}.
50  *
51  * @author Brian Stoler
52  */
53 
54 public class AbstractFutureTest extends TestCase {
testSuccess()55   public void testSuccess() throws ExecutionException, InterruptedException {
56     final Object value = new Object();
57     assertSame(
58         value,
59         new AbstractFuture<Object>() {
60           {
61             set(value);
62           }
63         }.get());
64   }
65 
testException()66   public void testException() throws InterruptedException {
67     final Throwable failure = new Throwable();
68     AbstractFuture<String> future =
69         new AbstractFuture<String>() {
70           {
71             setException(failure);
72           }
73         };
74 
75     ExecutionException ee1 = getExpectingExecutionException(future);
76     ExecutionException ee2 = getExpectingExecutionException(future);
77 
78     // Ensure we get a unique execution exception on each get
79     assertNotSame(ee1, ee2);
80 
81     assertThat(ee1).hasCauseThat().isSameAs(failure);
82     assertThat(ee2).hasCauseThat().isSameAs(failure);
83 
84     checkStackTrace(ee1);
85     checkStackTrace(ee2);
86   }
87 
testCancel_notDoneNoInterrupt()88   public void testCancel_notDoneNoInterrupt() throws Exception {
89     InterruptibleFuture future = new InterruptibleFuture();
90     assertTrue(future.cancel(false));
91     assertTrue(future.isCancelled());
92     assertTrue(future.isDone());
93     assertFalse(future.wasInterrupted());
94     assertFalse(future.interruptTaskWasCalled);
95     try {
96       future.get();
97       fail("Expected CancellationException");
98     } catch (CancellationException e) {
99       // See AbstractFutureCancellationCauseTest for how to set causes
100       assertThat(e).hasCauseThat().isNull();
101     }
102   }
103 
testCancel_notDoneInterrupt()104   public void testCancel_notDoneInterrupt() throws Exception {
105     InterruptibleFuture future = new InterruptibleFuture();
106     assertTrue(future.cancel(true));
107     assertTrue(future.isCancelled());
108     assertTrue(future.isDone());
109     assertTrue(future.wasInterrupted());
110     assertTrue(future.interruptTaskWasCalled);
111     try {
112       future.get();
113       fail("Expected CancellationException");
114     } catch (CancellationException e) {
115       // See AbstractFutureCancellationCauseTest for how to set causes
116       assertThat(e).hasCauseThat().isNull();
117     }
118   }
119 
testCancel_done()120   public void testCancel_done() throws Exception {
121     AbstractFuture<String> future =
122         new AbstractFuture<String>() {
123           {
124             set("foo");
125           }
126         };
127     assertFalse(future.cancel(true));
128     assertFalse(future.isCancelled());
129     assertTrue(future.isDone());
130   }
131 
testGetWithTimeoutDoneFuture()132   public void testGetWithTimeoutDoneFuture() throws Exception {
133     AbstractFuture<String> future =
134         new AbstractFuture<String>() {
135           {
136             set("foo");
137           }
138         };
139     assertEquals("foo", future.get(0, TimeUnit.SECONDS));
140   }
141 
testEvilFuture_setFuture()142   public void testEvilFuture_setFuture() throws Exception {
143     final RuntimeException exception = new RuntimeException("you didn't say the magic word!");
144     AbstractFuture<String> evilFuture =
145         new AbstractFuture<String>() {
146           @Override
147           public void addListener(Runnable r, Executor e) {
148             throw exception;
149           }
150         };
151     AbstractFuture<String> normalFuture = new AbstractFuture<String>() {};
152     normalFuture.setFuture(evilFuture);
153     assertTrue(normalFuture.isDone());
154     try {
155       normalFuture.get();
156       fail();
157     } catch (ExecutionException e) {
158       assertThat(e).hasCauseThat().isSameAs(exception);
159     }
160   }
161 
testRemoveWaiter_interruption()162   public void testRemoveWaiter_interruption() throws Exception {
163     final AbstractFuture<String> future = new AbstractFuture<String>() {};
164     WaiterThread waiter1 = new WaiterThread(future);
165     waiter1.start();
166     waiter1.awaitWaiting();
167 
168     WaiterThread waiter2 = new WaiterThread(future);
169     waiter2.start();
170     waiter2.awaitWaiting();
171     // The waiter queue should be waiter2->waiter1
172 
173     // This should wake up waiter1 and cause the waiter1 node to be removed.
174     waiter1.interrupt();
175 
176     waiter1.join();
177     waiter2.awaitWaiting(); // should still be blocked
178 
179     LockSupport.unpark(waiter2); // spurious wakeup
180     waiter2.awaitWaiting(); // should eventually re-park
181 
182     future.set(null);
183     waiter2.join();
184   }
185 
testRemoveWaiter_polling()186   public void testRemoveWaiter_polling() throws Exception {
187     final AbstractFuture<String> future = new AbstractFuture<String>() {};
188     WaiterThread waiter = new WaiterThread(future);
189     waiter.start();
190     waiter.awaitWaiting();
191     PollingThread poller = new PollingThread(future);
192     poller.start();
193     PollingThread poller2 = new PollingThread(future);
194     poller2.start();
195     PollingThread poller3 = new PollingThread(future);
196     poller3.start();
197     poller.awaitInLoop();
198     poller2.awaitInLoop();
199     poller3.awaitInLoop();
200 
201     // The waiter queue should be {poller x 3}->waiter1
202     waiter.interrupt();
203 
204     // This should wake up waiter1 and cause the waiter1 node to be removed.
205     waiter.join();
206     future.set(null);
207     poller.join();
208   }
209 
testToString_allUnique()210   public void testToString_allUnique() throws Exception {
211     // Two futures should not have the same toString, to avoid people asserting on it
212     assertThat(SettableFuture.create().toString()).isNotEqualTo(SettableFuture.create().toString());
213   }
214 
testToString_notDone()215   public void testToString_notDone() throws Exception {
216     AbstractFuture<Object> testFuture =
217         new AbstractFuture<Object>() {
218           @Override
219           public String pendingToString() {
220             return "cause=[Because this test isn't done]";
221           }
222         };
223     assertThat(testFuture.toString())
224         .matches(
225             "[^\\[]+\\[status=PENDING, info=\\[cause=\\[Because this test isn't done\\]\\]\\]");
226     try {
227       testFuture.get(1, TimeUnit.NANOSECONDS);
228       fail();
229     } catch (TimeoutException e) {
230       assertThat(e.getMessage()).contains("1 nanoseconds");
231       assertThat(e.getMessage()).contains("Because this test isn't done");
232     }
233   }
234 
235   /**
236    * This test attempts to cause a future to wait for longer than it was requested to from a timed
237    * get() call. As measurements of time are prone to flakiness, it tries to assert based on ranges
238    * derived from observing how much time actually passed for various operations.
239    */
240   @SuppressWarnings({"DeprecatedThreadMethods", "ThreadPriorityCheck"})
testToString_delayedTimeout()241   public void testToString_delayedTimeout() throws Exception {
242     TimedWaiterThread thread =
243         new TimedWaiterThread(new AbstractFuture<Object>() {}, 2, TimeUnit.SECONDS);
244     thread.start();
245     thread.awaitWaiting();
246     thread.suspend();
247     // Sleep for enough time to add 1500 milliseconds of overwait to the get() call.
248     long toWaitMillis = 3500 - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - thread.startTime);
249     Thread.sleep(toWaitMillis);
250     thread.setPriority(Thread.MAX_PRIORITY);
251     thread.resume();
252     thread.join();
253     // It's possible to race and suspend the thread just before the park call actually takes effect,
254     // causing the thread to be suspended for 3.5 seconds, and then park itself for 2 seconds after
255     // being resumed. To avoid a flake in this scenario, calculate how long that thread actually
256     // waited and assert based on that time. Empirically, the race where the thread ends up waiting
257     // for 5.5 seconds happens about 2% of the time.
258     boolean longWait = TimeUnit.NANOSECONDS.toSeconds(thread.timeSpentBlocked) >= 5;
259     // Count how long it actually took to return; we'll accept any number between the expected delay
260     // and the approximate actual delay, to be robust to variance in thread scheduling.
261     char overWaitNanosFirstDigit =
262         Long.toString(
263                 thread.timeSpentBlocked - TimeUnit.MILLISECONDS.toNanos(longWait ? 5000 : 3000))
264             .charAt(0);
265     if (overWaitNanosFirstDigit < '4') {
266       overWaitNanosFirstDigit = '9';
267     }
268     String nanosRegex = "[4-" + overWaitNanosFirstDigit + "][0-9]+";
269     assertWithMessage(
270             "Spent " + thread.timeSpentBlocked + " ns blocked; slept for " + toWaitMillis + " ms")
271         .that(thread.exception)
272         .hasMessageThat()
273         .matches(
274             "Waited 2 seconds \\(plus "
275                 + (longWait ? "3" : "1")
276                 + " seconds, "
277                 + nanosRegex
278                 + " nanoseconds delay\\).*");
279   }
280 
testToString_completed()281   public void testToString_completed() throws Exception {
282     AbstractFuture<Object> testFuture2 =
283         new AbstractFuture<Object>() {
284           @Override
285           public String pendingToString() {
286             return "cause=[Someday...]";
287           }
288         };
289     AbstractFuture<Object> testFuture3 = new AbstractFuture<Object>() {};
290     testFuture3.setFuture(testFuture2);
291     assertThat(testFuture3.toString())
292         .matches(
293             "[^\\[]+\\[status=PENDING, info=\\[setFuture="
294                 + "\\[[^\\[]+\\[status=PENDING, info=\\[cause=\\[Someday...\\]\\]\\]\\]\\]\\]");
295     testFuture2.set("result string");
296     assertThat(testFuture3.toString())
297         .matches("[^\\[]+\\[status=SUCCESS, result=\\[result string\\]\\]");
298   }
299 
testToString_cancelled()300   public void testToString_cancelled() throws Exception {
301     assertThat(Futures.immediateCancelledFuture().toString())
302         .matches("[^\\[]+\\[status=CANCELLED\\]");
303   }
304 
testToString_failed()305   public void testToString_failed() {
306     assertThat(Futures.immediateFailedFuture(new RuntimeException("foo")).toString())
307         .matches("[^\\[]+\\[status=FAILURE, cause=\\[java.lang.RuntimeException: foo\\]\\]");
308   }
309 
testToString_misbehaving()310   public void testToString_misbehaving() throws Exception {
311     assertThat(
312             new AbstractFuture<Object>() {
313               @Override
314               public String pendingToString() {
315                 throw new RuntimeException("I'm a misbehaving implementation");
316               }
317             }.toString())
318         .matches(
319             "[^\\[]+\\[status=PENDING, info=\\[Exception thrown from implementation: "
320                 + "class java.lang.RuntimeException\\]\\]");
321   }
322 
testCompletionFinishesWithDone()323   public void testCompletionFinishesWithDone() {
324     ExecutorService executor = Executors.newFixedThreadPool(10);
325     for (int i = 0; i < 50000; i++) {
326       final AbstractFuture<String> future = new AbstractFuture<String>() {};
327       final AtomicReference<String> errorMessage = Atomics.newReference();
328       executor.execute(
329           new Runnable() {
330             @Override
331             public void run() {
332               future.set("success");
333               if (!future.isDone()) {
334                 errorMessage.set("Set call exited before future was complete.");
335               }
336             }
337           });
338       executor.execute(
339           new Runnable() {
340             @Override
341             public void run() {
342               future.setException(new IllegalArgumentException("failure"));
343               if (!future.isDone()) {
344                 errorMessage.set("SetException call exited before future was complete.");
345               }
346             }
347           });
348       executor.execute(
349           new Runnable() {
350             @Override
351             public void run() {
352               future.cancel(true);
353               if (!future.isDone()) {
354                 errorMessage.set("Cancel call exited before future was complete.");
355               }
356             }
357           });
358       try {
359         future.get();
360       } catch (Throwable t) {
361         // Ignore, we just wanted to block.
362       }
363       String error = errorMessage.get();
364       assertNull(error, error);
365     }
366     executor.shutdown();
367   }
368 
369   /**
370    * He did the bash, he did the future bash The future bash, it was a concurrency smash He did the
371    * bash, it caught on in a flash He did the bash, he did the future bash
372    */
373 
testFutureBash()374   public void testFutureBash() {
375     final CyclicBarrier barrier =
376         new CyclicBarrier(
377             6 // for the setter threads
378                 + 50 // for the listeners
379                 + 50 // for the blocking get threads,
380                 + 1); // for the main thread
381     final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties());
382     final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference();
383     final AtomicInteger numSuccessfulSetCalls = new AtomicInteger();
384     Callable<Void> completeSucessFullyRunnable =
385         new Callable<Void>() {
386           @Override
387           public Void call() {
388             if (currentFuture.get().set("set")) {
389               numSuccessfulSetCalls.incrementAndGet();
390             }
391             awaitUnchecked(barrier);
392             return null;
393           }
394         };
395     Callable<Void> completeExceptionallyRunnable =
396         new Callable<Void>() {
397           Exception failureCause = new Exception("setException");
398 
399           @Override
400           public Void call() {
401             if (currentFuture.get().setException(failureCause)) {
402               numSuccessfulSetCalls.incrementAndGet();
403             }
404             awaitUnchecked(barrier);
405             return null;
406           }
407         };
408     Callable<Void> cancelRunnable =
409         new Callable<Void>() {
410           @Override
411           public Void call() {
412             if (currentFuture.get().cancel(true)) {
413               numSuccessfulSetCalls.incrementAndGet();
414             }
415             awaitUnchecked(barrier);
416             return null;
417           }
418         };
419     Callable<Void> setFutureCompleteSucessFullyRunnable =
420         new Callable<Void>() {
421           ListenableFuture<String> future = Futures.immediateFuture("setFuture");
422 
423           @Override
424           public Void call() {
425             if (currentFuture.get().setFuture(future)) {
426               numSuccessfulSetCalls.incrementAndGet();
427             }
428             awaitUnchecked(barrier);
429             return null;
430           }
431         };
432     Callable<Void> setFutureCompleteExceptionallyRunnable =
433         new Callable<Void>() {
434           ListenableFuture<String> future =
435               Futures.immediateFailedFuture(new Exception("setFuture"));
436 
437           @Override
438           public Void call() {
439             if (currentFuture.get().setFuture(future)) {
440               numSuccessfulSetCalls.incrementAndGet();
441             }
442             awaitUnchecked(barrier);
443             return null;
444           }
445         };
446     Callable<Void> setFutureCancelRunnable =
447         new Callable<Void>() {
448           ListenableFuture<String> future = Futures.immediateCancelledFuture();
449 
450           @Override
451           public Void call() {
452             if (currentFuture.get().setFuture(future)) {
453               numSuccessfulSetCalls.incrementAndGet();
454             }
455             awaitUnchecked(barrier);
456             return null;
457           }
458         };
459     final Set<Object> finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet());
460     Runnable collectResultsRunnable =
461         new Runnable() {
462           @Override
463           public void run() {
464             try {
465               String result = Uninterruptibles.getUninterruptibly(currentFuture.get());
466               finalResults.add(result);
467             } catch (ExecutionException e) {
468               finalResults.add(e.getCause());
469             } catch (CancellationException e) {
470               finalResults.add(CancellationException.class);
471             } finally {
472               awaitUnchecked(barrier);
473             }
474           }
475         };
476     Runnable collectResultsTimedGetRunnable =
477         new Runnable() {
478           @Override
479           public void run() {
480             Future<String> future = currentFuture.get();
481             while (true) {
482               try {
483                 String result = Uninterruptibles.getUninterruptibly(future, 0, TimeUnit.SECONDS);
484                 finalResults.add(result);
485                 break;
486               } catch (ExecutionException e) {
487                 finalResults.add(e.getCause());
488                 break;
489               } catch (CancellationException e) {
490                 finalResults.add(CancellationException.class);
491                 break;
492               } catch (TimeoutException e) {
493                 // loop
494               }
495             }
496             awaitUnchecked(barrier);
497           }
498         };
499     List<Callable<?>> allTasks = new ArrayList<>();
500     allTasks.add(completeSucessFullyRunnable);
501     allTasks.add(completeExceptionallyRunnable);
502     allTasks.add(cancelRunnable);
503     allTasks.add(setFutureCompleteSucessFullyRunnable);
504     allTasks.add(setFutureCompleteExceptionallyRunnable);
505     allTasks.add(setFutureCancelRunnable);
506     for (int k = 0; k < 50; k++) {
507       // For each listener we add a task that submits it to the executor directly for the blocking
508       // get usecase and another task that adds it as a listener to the future to exercise both
509       // racing addListener calls and addListener calls completing after the future completes.
510       final Runnable listener =
511           k % 2 == 0 ? collectResultsRunnable : collectResultsTimedGetRunnable;
512       allTasks.add(Executors.callable(listener));
513       allTasks.add(
514           new Callable<Void>() {
515             @Override
516             public Void call() throws Exception {
517               currentFuture.get().addListener(listener, executor);
518               return null;
519             }
520           });
521     }
522     assertEquals(allTasks.size() + 1, barrier.getParties());
523     for (int i = 0; i < 1000; i++) {
524       Collections.shuffle(allTasks);
525       final AbstractFuture<String> future = new AbstractFuture<String>() {};
526       currentFuture.set(future);
527       for (Callable<?> task : allTasks) {
528         @SuppressWarnings("unused") // go/futurereturn-lsc
529         Future<?> possiblyIgnoredError = executor.submit(task);
530       }
531       awaitUnchecked(barrier);
532       assertThat(future.isDone()).isTrue();
533       // inspect state and ensure it is correct!
534       // asserts that all get calling threads received the same value
535       Object result = Iterables.getOnlyElement(finalResults);
536       if (result == CancellationException.class) {
537         assertTrue(future.isCancelled());
538         if (future.wasInterrupted()) {
539           // We were cancelled, it is possible that setFuture could have succeeded too.
540           assertThat(numSuccessfulSetCalls.get()).isIn(Range.closed(1, 2));
541         } else {
542           assertThat(numSuccessfulSetCalls.get()).isEqualTo(1);
543         }
544       } else {
545         assertThat(numSuccessfulSetCalls.get()).isEqualTo(1);
546       }
547       // reset for next iteration
548       numSuccessfulSetCalls.set(0);
549       finalResults.clear();
550     }
551     executor.shutdown();
552   }
553 
554   // setFuture and cancel() interact in more complicated ways than the other setters.
testSetFutureCancelBash()555   public void testSetFutureCancelBash() {
556     final int size = 50;
557     final CyclicBarrier barrier =
558         new CyclicBarrier(
559             2 // for the setter threads
560                 + size // for the listeners
561                 + size // for the get threads,
562                 + 1); // for the main thread
563     final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties());
564     final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference();
565     final AtomicReference<AbstractFuture<String>> setFutureFuture = Atomics.newReference();
566     final AtomicBoolean setFutureSetSucess = new AtomicBoolean();
567     final AtomicBoolean setFutureCompletionSucess = new AtomicBoolean();
568     final AtomicBoolean cancellationSucess = new AtomicBoolean();
569     Runnable cancelRunnable =
570         new Runnable() {
571           @Override
572           public void run() {
573             cancellationSucess.set(currentFuture.get().cancel(true));
574             awaitUnchecked(barrier);
575           }
576         };
577     Runnable setFutureCompleteSucessFullyRunnable =
578         new Runnable() {
579           @Override
580           public void run() {
581             AbstractFuture<String> future = setFutureFuture.get();
582             setFutureSetSucess.set(currentFuture.get().setFuture(future));
583             setFutureCompletionSucess.set(future.set("hello-async-world"));
584             awaitUnchecked(barrier);
585           }
586         };
587     final Set<Object> finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet());
588     Runnable collectResultsRunnable =
589         new Runnable() {
590           @Override
591           public void run() {
592             try {
593               String result = Uninterruptibles.getUninterruptibly(currentFuture.get());
594               finalResults.add(result);
595             } catch (ExecutionException e) {
596               finalResults.add(e.getCause());
597             } catch (CancellationException e) {
598               finalResults.add(CancellationException.class);
599             } finally {
600               awaitUnchecked(barrier);
601             }
602           }
603         };
604     Runnable collectResultsTimedGetRunnable =
605         new Runnable() {
606           @Override
607           public void run() {
608             Future<String> future = currentFuture.get();
609             while (true) {
610               try {
611                 String result = Uninterruptibles.getUninterruptibly(future, 0, TimeUnit.SECONDS);
612                 finalResults.add(result);
613                 break;
614               } catch (ExecutionException e) {
615                 finalResults.add(e.getCause());
616                 break;
617               } catch (CancellationException e) {
618                 finalResults.add(CancellationException.class);
619                 break;
620               } catch (TimeoutException e) {
621                 // loop
622               }
623             }
624             awaitUnchecked(barrier);
625           }
626         };
627     List<Runnable> allTasks = new ArrayList<>();
628     allTasks.add(cancelRunnable);
629     allTasks.add(setFutureCompleteSucessFullyRunnable);
630     for (int k = 0; k < size; k++) {
631       // For each listener we add a task that submits it to the executor directly for the blocking
632       // get usecase and another task that adds it as a listener to the future to exercise both
633       // racing addListener calls and addListener calls completing after the future completes.
634       final Runnable listener =
635           k % 2 == 0 ? collectResultsRunnable : collectResultsTimedGetRunnable;
636       allTasks.add(listener);
637       allTasks.add(
638           new Runnable() {
639             @Override
640             public void run() {
641               currentFuture.get().addListener(listener, executor);
642             }
643           });
644     }
645     assertEquals(allTasks.size() + 1, barrier.getParties()); // sanity check
646     for (int i = 0; i < 1000; i++) {
647       Collections.shuffle(allTasks);
648       final AbstractFuture<String> future = new AbstractFuture<String>() {};
649       final AbstractFuture<String> setFuture = new AbstractFuture<String>() {};
650       currentFuture.set(future);
651       setFutureFuture.set(setFuture);
652       for (Runnable task : allTasks) {
653         executor.execute(task);
654       }
655       awaitUnchecked(barrier);
656       assertThat(future.isDone()).isTrue();
657       // inspect state and ensure it is correct!
658       // asserts that all get calling threads received the same value
659       Object result = Iterables.getOnlyElement(finalResults);
660       if (result == CancellationException.class) {
661         assertTrue(future.isCancelled());
662         assertTrue(cancellationSucess.get());
663         // cancellation can interleave in 3 ways
664         // 1. prior to setFuture
665         // 2. after setFuture before set() on the future assigned
666         // 3. after setFuture and set() are called but before the listener completes.
667         if (!setFutureSetSucess.get() || !setFutureCompletionSucess.get()) {
668           // If setFuture fails or set on the future fails then it must be because that future was
669           // cancelled
670           assertTrue(setFuture.isCancelled());
671           assertTrue(setFuture.wasInterrupted()); // we only call cancel(true)
672         }
673       } else {
674         // set on the future completed
675         assertFalse(cancellationSucess.get());
676         assertTrue(setFutureSetSucess.get());
677         assertTrue(setFutureCompletionSucess.get());
678       }
679       // reset for next iteration
680       setFutureSetSucess.set(false);
681       setFutureCompletionSucess.set(false);
682       cancellationSucess.set(false);
683       finalResults.clear();
684     }
685     executor.shutdown();
686   }
687 
688   // Test to ensure that when calling setFuture with a done future only setFuture or cancel can
689   // return true.
testSetFutureCancelBash_withDoneFuture()690   public void testSetFutureCancelBash_withDoneFuture() {
691     final CyclicBarrier barrier =
692         new CyclicBarrier(
693             2 // for the setter threads
694                 + 1 // for the blocking get thread,
695                 + 1); // for the main thread
696     final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties());
697     final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference();
698     final AtomicBoolean setFutureSuccess = new AtomicBoolean();
699     final AtomicBoolean cancellationSucess = new AtomicBoolean();
700     Callable<Void> cancelRunnable =
701         new Callable<Void>() {
702           @Override
703           public Void call() {
704             cancellationSucess.set(currentFuture.get().cancel(true));
705             awaitUnchecked(barrier);
706             return null;
707           }
708         };
709     Callable<Void> setFutureCompleteSucessFullyRunnable =
710         new Callable<Void>() {
711           final ListenableFuture<String> future = Futures.immediateFuture("hello");
712 
713           @Override
714           public Void call() {
715             setFutureSuccess.set(currentFuture.get().setFuture(future));
716             awaitUnchecked(barrier);
717             return null;
718           }
719         };
720     final Set<Object> finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet());
721     final Runnable collectResultsRunnable =
722         new Runnable() {
723           @Override
724           public void run() {
725             try {
726               String result = Uninterruptibles.getUninterruptibly(currentFuture.get());
727               finalResults.add(result);
728             } catch (ExecutionException e) {
729               finalResults.add(e.getCause());
730             } catch (CancellationException e) {
731               finalResults.add(CancellationException.class);
732             } finally {
733               awaitUnchecked(barrier);
734             }
735           }
736         };
737     List<Callable<?>> allTasks = new ArrayList<>();
738     allTasks.add(cancelRunnable);
739     allTasks.add(setFutureCompleteSucessFullyRunnable);
740     allTasks.add(Executors.callable(collectResultsRunnable));
741     assertEquals(allTasks.size() + 1, barrier.getParties()); // sanity check
742     for (int i = 0; i < 1000; i++) {
743       Collections.shuffle(allTasks);
744       final AbstractFuture<String> future = new AbstractFuture<String>() {};
745       currentFuture.set(future);
746       for (Callable<?> task : allTasks) {
747         @SuppressWarnings("unused") // go/futurereturn-lsc
748         Future<?> possiblyIgnoredError = executor.submit(task);
749       }
750       awaitUnchecked(barrier);
751       assertThat(future.isDone()).isTrue();
752       // inspect state and ensure it is correct!
753       // asserts that all get calling threads received the same value
754       Object result = Iterables.getOnlyElement(finalResults);
755       if (result == CancellationException.class) {
756         assertTrue(future.isCancelled());
757         assertTrue(cancellationSucess.get());
758         assertFalse(setFutureSuccess.get());
759       } else {
760         assertTrue(setFutureSuccess.get());
761         assertFalse(cancellationSucess.get());
762       }
763       // reset for next iteration
764       setFutureSuccess.set(false);
765       cancellationSucess.set(false);
766       finalResults.clear();
767     }
768     executor.shutdown();
769   }
770 
771   // In a previous implementation this would cause a stack overflow after ~2000 futures chained
772   // together.  Now it should only be limited by available memory (and time)
testSetFuture_stackOverflow()773   public void testSetFuture_stackOverflow() {
774     SettableFuture<String> orig = SettableFuture.create();
775     SettableFuture<String> prev = orig;
776     for (int i = 0; i < 100000; i++) {
777       SettableFuture<String> curr = SettableFuture.create();
778       prev.setFuture(curr);
779       prev = curr;
780     }
781     // prev represents the 'innermost' future
782     prev.set("done");
783     assertTrue(orig.isDone());
784   }
785 
testSetFuture_misbehavingFutureThrows()786   public void testSetFuture_misbehavingFutureThrows() throws Exception {
787     SettableFuture<String> future = SettableFuture.create();
788     ListenableFuture<String> badFuture =
789         new ListenableFuture<String>() {
790           @Override
791           public boolean cancel(boolean interrupt) {
792             return false;
793           }
794 
795           @Override
796           public boolean isDone() {
797             return true;
798           }
799 
800           @Override
801           public boolean isCancelled() {
802             return false; // BAD!!
803           }
804 
805           @Override
806           public String get() {
807             throw new CancellationException(); // BAD!!
808           }
809 
810           @Override
811           public String get(long time, TimeUnit unit) {
812             throw new CancellationException(); // BAD!!
813           }
814 
815           @Override
816           public void addListener(Runnable runnable, Executor executor) {
817             executor.execute(runnable);
818           }
819         };
820     future.setFuture(badFuture);
821     ExecutionException expected = getExpectingExecutionException(future);
822     assertThat(expected).hasCauseThat().isInstanceOf(IllegalArgumentException.class);
823     assertThat(expected).hasCauseThat().hasMessageThat().contains(badFuture.toString());
824   }
825 
testSetFuture_misbehavingFutureDoesNotThrow()826   public void testSetFuture_misbehavingFutureDoesNotThrow() throws Exception {
827     SettableFuture<String> future = SettableFuture.create();
828     ListenableFuture<String> badFuture =
829         new ListenableFuture<String>() {
830           @Override
831           public boolean cancel(boolean interrupt) {
832             return false;
833           }
834 
835           @Override
836           public boolean isDone() {
837             return true;
838           }
839 
840           @Override
841           public boolean isCancelled() {
842             return true; // BAD!!
843           }
844 
845           @Override
846           public String get() {
847             return "foo"; // BAD!!
848           }
849 
850           @Override
851           public String get(long time, TimeUnit unit) {
852             return "foo"; // BAD!!
853           }
854 
855           @Override
856           public void addListener(Runnable runnable, Executor executor) {
857             executor.execute(runnable);
858           }
859         };
860     future.setFuture(badFuture);
861     assertThat(future.isCancelled()).isTrue();
862   }
863 
testCancel_stackOverflow()864   public void testCancel_stackOverflow() {
865     SettableFuture<String> orig = SettableFuture.create();
866     SettableFuture<String> prev = orig;
867     for (int i = 0; i < 100000; i++) {
868       SettableFuture<String> curr = SettableFuture.create();
869       prev.setFuture(curr);
870       prev = curr;
871     }
872     // orig is the 'outermost future', this should propagate fully down the stack of futures.
873     orig.cancel(true);
874     assertTrue(orig.isCancelled());
875     assertTrue(prev.isCancelled());
876     assertTrue(prev.wasInterrupted());
877   }
878 
testSetFutureSelf_cancel()879   public void testSetFutureSelf_cancel() {
880     SettableFuture<String> orig = SettableFuture.create();
881     orig.setFuture(orig);
882     orig.cancel(true);
883     assertTrue(orig.isCancelled());
884   }
885 
testSetFutureSelf_toString()886   public void testSetFutureSelf_toString() {
887     SettableFuture<String> orig = SettableFuture.create();
888     orig.setFuture(orig);
889     assertThat(orig.toString()).contains("[status=PENDING, info=[setFuture=[this future]]]");
890   }
891 
testSetSelf_toString()892   public void testSetSelf_toString() {
893     SettableFuture<Object> orig = SettableFuture.create();
894     orig.set(orig);
895     assertThat(orig.toString()).contains("[status=SUCCESS, result=[this future]]");
896   }
897 
testSetIndirectSelf_toString()898   public void testSetIndirectSelf_toString() {
899     final SettableFuture<Object> orig = SettableFuture.create();
900     // unlike the above this indirection defeats the trivial cycle detection and causes a SOE
901     orig.set(
902         new Object() {
903           @Override
904           public String toString() {
905             return orig.toString();
906           }
907         });
908     try {
909       orig.toString();
910       fail();
911     } catch (StackOverflowError expected) {
912     }
913   }
914 
915   // Regression test for a case where we would fail to execute listeners immediately on done futures
916   // this would be observable from an afterDone callback
testListenersExecuteImmediately_fromAfterDone()917   public void testListenersExecuteImmediately_fromAfterDone() {
918     AbstractFuture<String> f =
919         new AbstractFuture<String>() {
920           @Override
921           protected void afterDone() {
922             final AtomicBoolean ranImmediately = new AtomicBoolean();
923             addListener(
924                 new Runnable() {
925                   @Override
926                   public void run() {
927                     ranImmediately.set(true);
928                   }
929                 },
930                 MoreExecutors.directExecutor());
931             assertThat(ranImmediately.get()).isTrue();
932           }
933         };
934     f.set("foo");
935   }
936 
937   // Regression test for a case where we would fail to execute listeners immediately on done futures
938   // this would be observable from a waiter that was just unblocked.
testListenersExecuteImmediately_afterWaiterWakesUp()939   public void testListenersExecuteImmediately_afterWaiterWakesUp() throws Exception {
940     final AbstractFuture<String> f =
941         new AbstractFuture<String>() {
942           @Override
943           protected void afterDone() {
944             // this simply delays executing listeners
945             try {
946               Thread.sleep(TimeUnit.SECONDS.toMillis(10));
947             } catch (InterruptedException ignored) {
948               Thread.currentThread().interrupt(); // preserve status
949             }
950           }
951         };
952     Thread t =
953         new Thread() {
954           @Override
955           public void run() {
956             f.set("foo");
957           }
958         };
959     t.start();
960     f.get();
961     final AtomicBoolean ranImmediately = new AtomicBoolean();
962     f.addListener(
963         new Runnable() {
964           @Override
965           public void run() {
966             ranImmediately.set(true);
967           }
968         },
969         MoreExecutors.directExecutor());
970     assertThat(ranImmediately.get()).isTrue();
971     t.interrupt();
972     t.join();
973   }
974 
testTrustedGetFailure_Completed()975   public void testTrustedGetFailure_Completed() {
976     SettableFuture<String> future = SettableFuture.create();
977     future.set("261");
978     assertThat(future.tryInternalFastPathGetFailure()).isNull();
979   }
980 
testTrustedGetFailure_Failed()981   public void testTrustedGetFailure_Failed() {
982     SettableFuture<String> future = SettableFuture.create();
983     Throwable failure = new Throwable();
984     future.setException(failure);
985     assertThat(future.tryInternalFastPathGetFailure()).isEqualTo(failure);
986   }
987 
testTrustedGetFailure_NotCompleted()988   public void testTrustedGetFailure_NotCompleted() {
989     SettableFuture<String> future = SettableFuture.create();
990     assertThat(future.isDone()).isFalse();
991     assertThat(future.tryInternalFastPathGetFailure()).isNull();
992   }
993 
testTrustedGetFailure_CanceledNoCause()994   public void testTrustedGetFailure_CanceledNoCause() {
995     SettableFuture<String> future = SettableFuture.create();
996     future.cancel(false);
997     assertThat(future.tryInternalFastPathGetFailure()).isNull();
998   }
999 
testGetFailure_Completed()1000   public void testGetFailure_Completed() {
1001     AbstractFuture<String> future = new AbstractFuture<String>() {};
1002     future.set("261");
1003     assertThat(future.tryInternalFastPathGetFailure()).isNull();
1004   }
1005 
testGetFailure_Failed()1006   public void testGetFailure_Failed() {
1007     AbstractFuture<String> future = new AbstractFuture<String>() {};
1008     final Throwable failure = new Throwable();
1009     future.setException(failure);
1010     assertThat(future.tryInternalFastPathGetFailure()).isNull();
1011   }
1012 
testGetFailure_NotCompleted()1013   public void testGetFailure_NotCompleted() {
1014     AbstractFuture<String> future = new AbstractFuture<String>() {};
1015     assertThat(future.isDone()).isFalse();
1016     assertThat(future.tryInternalFastPathGetFailure()).isNull();
1017   }
1018 
testGetFailure_CanceledNoCause()1019   public void testGetFailure_CanceledNoCause() {
1020     AbstractFuture<String> future = new AbstractFuture<String>() {};
1021     future.cancel(false);
1022     assertThat(future.tryInternalFastPathGetFailure()).isNull();
1023   }
1024 
testForwardExceptionFastPath()1025   public void testForwardExceptionFastPath() throws Exception {
1026     class FailFuture extends InternalFutureFailureAccess implements ListenableFuture<String> {
1027       Throwable failure;
1028 
1029       FailFuture(Throwable throwable) {
1030         failure = throwable;
1031       }
1032 
1033       @Override
1034       public boolean cancel(boolean mayInterruptIfRunning) {
1035         throw new AssertionFailedError("cancel shouldn't be called on this object");
1036       }
1037 
1038       @Override
1039       public boolean isCancelled() {
1040         return false;
1041       }
1042 
1043       @Override
1044       public boolean isDone() {
1045         return true;
1046       }
1047 
1048       @Override
1049       public String get() throws InterruptedException, ExecutionException {
1050         throw new AssertionFailedError("get() shouldn't be called on this object");
1051       }
1052 
1053       @Override
1054       public String get(long timeout, TimeUnit unit)
1055           throws InterruptedException, ExecutionException, TimeoutException {
1056         return get();
1057       }
1058 
1059       @Override
1060       protected Throwable tryInternalFastPathGetFailure() {
1061         return failure;
1062       }
1063 
1064       @Override
1065       public void addListener(Runnable listener, Executor executor) {
1066         throw new AssertionFailedError("addListener() shouldn't be called on this object");
1067       }
1068     }
1069 
1070     final RuntimeException exception = new RuntimeException("you still didn't say the magic word!");
1071     SettableFuture<String> normalFuture = SettableFuture.create();
1072     normalFuture.setFuture(new FailFuture(exception));
1073     assertTrue(normalFuture.isDone());
1074     try {
1075       normalFuture.get();
1076       fail();
1077     } catch (ExecutionException e) {
1078       assertSame(exception, e.getCause());
1079     }
1080   }
1081 
awaitUnchecked(final CyclicBarrier barrier)1082   private static void awaitUnchecked(final CyclicBarrier barrier) {
1083     try {
1084       barrier.await();
1085     } catch (Exception e) {
1086       throw new RuntimeException(e);
1087     }
1088   }
1089 
checkStackTrace(ExecutionException e)1090   private void checkStackTrace(ExecutionException e) {
1091     // Our call site for get() should be in the trace.
1092     int index = findStackFrame(e, getClass().getName(), "getExpectingExecutionException");
1093 
1094     assertThat(index).isNotEqualTo(0);
1095 
1096     // Above our method should be the call to get(). Don't assert on the class
1097     // because it could be some superclass.
1098     assertThat(e.getStackTrace()[index - 1].getMethodName()).isEqualTo("get");
1099   }
1100 
findStackFrame(ExecutionException e, String clazz, String method)1101   private static int findStackFrame(ExecutionException e, String clazz, String method) {
1102     StackTraceElement[] elements = e.getStackTrace();
1103     for (int i = 0; i < elements.length; i++) {
1104       StackTraceElement element = elements[i];
1105       if (element.getClassName().equals(clazz) && element.getMethodName().equals(method)) {
1106         return i;
1107       }
1108     }
1109     AssertionFailedError failure =
1110         new AssertionFailedError(
1111             "Expected element " + clazz + "." + method + " not found in stack trace");
1112     failure.initCause(e);
1113     throw failure;
1114   }
1115 
getExpectingExecutionException(AbstractFuture<String> future)1116   private ExecutionException getExpectingExecutionException(AbstractFuture<String> future)
1117       throws InterruptedException {
1118     try {
1119       String got = future.get();
1120       fail("Expected exception but got " + got);
1121     } catch (ExecutionException e) {
1122       return e;
1123     }
1124 
1125     // unreachable, but compiler doesn't know that fail() always throws
1126     return null;
1127   }
1128 
1129   private static final class WaiterThread extends Thread {
1130     private final AbstractFuture<?> future;
1131 
WaiterThread(AbstractFuture<?> future)1132     private WaiterThread(AbstractFuture<?> future) {
1133       this.future = future;
1134     }
1135 
1136     @Override
run()1137     public void run() {
1138       try {
1139         future.get();
1140       } catch (Exception e) {
1141         // nothing
1142       }
1143     }
1144 
awaitWaiting()1145     void awaitWaiting() {
1146       while (!isBlocked()) {
1147         if (getState() == State.TERMINATED) {
1148           throw new RuntimeException("Thread exited");
1149         }
1150         Thread.yield();
1151       }
1152     }
1153 
isBlocked()1154     private boolean isBlocked() {
1155       return getState() == Thread.State.WAITING && LockSupport.getBlocker(this) == future;
1156     }
1157   }
1158 
1159   static final class TimedWaiterThread extends Thread {
1160     private final AbstractFuture<?> future;
1161     private final long timeout;
1162     private final TimeUnit unit;
1163     private Exception exception;
1164     private volatile long startTime;
1165     private long timeSpentBlocked;
1166 
TimedWaiterThread(AbstractFuture<?> future, long timeout, TimeUnit unit)1167     TimedWaiterThread(AbstractFuture<?> future, long timeout, TimeUnit unit) {
1168       this.future = future;
1169       this.timeout = timeout;
1170       this.unit = unit;
1171     }
1172 
1173     @Override
run()1174     public void run() {
1175       startTime = System.nanoTime();
1176       try {
1177         future.get(timeout, unit);
1178       } catch (Exception e) {
1179         // nothing
1180         exception = e;
1181       } finally {
1182         timeSpentBlocked = System.nanoTime() - startTime;
1183       }
1184     }
1185 
awaitWaiting()1186     void awaitWaiting() {
1187       while (!isBlocked()) {
1188         if (getState() == State.TERMINATED) {
1189           throw new RuntimeException("Thread exited");
1190         }
1191         Thread.yield();
1192       }
1193     }
1194 
isBlocked()1195     private boolean isBlocked() {
1196       return getState() == Thread.State.TIMED_WAITING && LockSupport.getBlocker(this) == future;
1197     }
1198   }
1199 
1200   private static final class PollingThread extends Thread {
1201     private final AbstractFuture<?> future;
1202     private final CountDownLatch completedIteration = new CountDownLatch(10);
1203 
PollingThread(AbstractFuture<?> future)1204     private PollingThread(AbstractFuture<?> future) {
1205       this.future = future;
1206     }
1207 
1208     @Override
run()1209     public void run() {
1210       while (true) {
1211         try {
1212           future.get(0, TimeUnit.SECONDS);
1213           return;
1214         } catch (InterruptedException | ExecutionException e) {
1215           return;
1216         } catch (TimeoutException e) {
1217           // do nothing
1218         } finally {
1219           completedIteration.countDown();
1220         }
1221       }
1222     }
1223 
awaitInLoop()1224     void awaitInLoop() {
1225       Uninterruptibles.awaitUninterruptibly(completedIteration);
1226     }
1227   }
1228 
1229   private static final class InterruptibleFuture extends AbstractFuture<String> {
1230     boolean interruptTaskWasCalled;
1231 
1232     @Override
interruptTask()1233     protected void interruptTask() {
1234       assertFalse(interruptTaskWasCalled);
1235       interruptTaskWasCalled = true;
1236     }
1237   }
1238 }
1239