• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2009 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.base.Preconditions.checkState;
20 import static com.google.common.util.concurrent.Futures.immediateFuture;
21 import static com.google.common.util.concurrent.JdkFutureAdapters.listenInPoolThread;
22 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23 import static java.util.concurrent.Executors.newCachedThreadPool;
24 import static java.util.concurrent.TimeUnit.SECONDS;
25 
26 import com.google.common.testing.ClassSanityTester;
27 import com.google.common.util.concurrent.FuturesTest.ExecutorSpy;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.SynchronousQueue;
32 import java.util.concurrent.ThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34 import junit.framework.AssertionFailedError;
35 import junit.framework.TestCase;
36 
37 /**
38  * Unit tests for {@link JdkFutureAdapters}.
39  *
40  * @author Sven Mawson
41  * @author Kurt Alfred Kluever
42  */
43 public class JdkFutureAdaptersTest extends TestCase {
44   private static final String DATA1 = "data";
45 
testListenInPoolThreadReturnsSameFuture()46   public void testListenInPoolThreadReturnsSameFuture() throws Exception {
47     ListenableFuture<String> listenableFuture = immediateFuture(DATA1);
48     assertSame(listenableFuture, listenInPoolThread(listenableFuture));
49   }
50 
51   private static class SingleCallListener implements Runnable {
52 
53     private boolean expectCall = false;
54     private final CountDownLatch calledCountDown = new CountDownLatch(1);
55 
56     @Override
run()57     public void run() {
58       assertTrue("Listener called before it was expected", expectCall);
59       assertFalse("Listener called more than once", wasCalled());
60       calledCountDown.countDown();
61     }
62 
expectCall()63     public void expectCall() {
64       assertFalse("expectCall is already true", expectCall);
65       expectCall = true;
66     }
67 
wasCalled()68     public boolean wasCalled() {
69       return calledCountDown.getCount() == 0;
70     }
71 
waitForCall()72     public void waitForCall() throws InterruptedException {
73       assertTrue("expectCall is false", expectCall);
74       calledCountDown.await();
75     }
76   }
77 
testListenInPoolThreadIgnoresExecutorWhenDelegateIsDone()78   public void testListenInPoolThreadIgnoresExecutorWhenDelegateIsDone() throws Exception {
79     NonListenableSettableFuture<String> abstractFuture = NonListenableSettableFuture.create();
80     abstractFuture.set(DATA1);
81     ExecutorSpy spy = new ExecutorSpy(directExecutor());
82     ListenableFuture<String> listenableFuture = listenInPoolThread(abstractFuture, spy);
83 
84     SingleCallListener singleCallListener = new SingleCallListener();
85     singleCallListener.expectCall();
86 
87     assertFalse(spy.wasExecuted);
88     assertFalse(singleCallListener.wasCalled());
89     assertTrue(listenableFuture.isDone()); // We call AbstractFuture#set above.
90 
91     // #addListener() will run the listener immediately because the Future is
92     // already finished (we explicitly set the result of it above).
93     listenableFuture.addListener(singleCallListener, directExecutor());
94     assertEquals(DATA1, listenableFuture.get());
95 
96     // 'spy' should have been ignored since 'abstractFuture' was done before
97     // a listener was added.
98     assertFalse(spy.wasExecuted);
99     assertTrue(singleCallListener.wasCalled());
100     assertTrue(listenableFuture.isDone());
101   }
102 
testListenInPoolThreadUsesGivenExecutor()103   public void testListenInPoolThreadUsesGivenExecutor() throws Exception {
104     ExecutorService executorService =
105         newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
106     NonListenableSettableFuture<String> abstractFuture = NonListenableSettableFuture.create();
107     ExecutorSpy spy = new ExecutorSpy(executorService);
108     ListenableFuture<String> listenableFuture = listenInPoolThread(abstractFuture, spy);
109 
110     SingleCallListener singleCallListener = new SingleCallListener();
111     singleCallListener.expectCall();
112 
113     assertFalse(spy.wasExecuted);
114     assertFalse(singleCallListener.wasCalled());
115     assertFalse(listenableFuture.isDone());
116 
117     listenableFuture.addListener(singleCallListener, executorService);
118     abstractFuture.set(DATA1);
119     assertEquals(DATA1, listenableFuture.get());
120     singleCallListener.waitForCall();
121 
122     assertTrue(spy.wasExecuted);
123     assertTrue(singleCallListener.wasCalled());
124     assertTrue(listenableFuture.isDone());
125   }
126 
testListenInPoolThreadCustomExecutorInterrupted()127   public void testListenInPoolThreadCustomExecutorInterrupted() throws Exception {
128     final CountDownLatch submitSuccessful = new CountDownLatch(1);
129     ExecutorService executorService =
130         new ThreadPoolExecutor(
131             0,
132             Integer.MAX_VALUE,
133             60L,
134             TimeUnit.SECONDS,
135             new SynchronousQueue<Runnable>(),
136             new ThreadFactoryBuilder().setDaemon(true).build()) {
137           @Override
138           protected void beforeExecute(Thread t, Runnable r) {
139             submitSuccessful.countDown();
140           }
141         };
142     NonListenableSettableFuture<String> abstractFuture = NonListenableSettableFuture.create();
143     ListenableFuture<String> listenableFuture = listenInPoolThread(abstractFuture, executorService);
144 
145     SingleCallListener singleCallListener = new SingleCallListener();
146     singleCallListener.expectCall();
147 
148     assertFalse(singleCallListener.wasCalled());
149     assertFalse(listenableFuture.isDone());
150 
151     listenableFuture.addListener(singleCallListener, directExecutor());
152     /*
153      * Don't shut down until the listenInPoolThread task has been accepted to
154      * run. We want to see what happens when it's interrupted, not when it's
155      * rejected.
156      */
157     submitSuccessful.await();
158     executorService.shutdownNow();
159     abstractFuture.set(DATA1);
160     assertEquals(DATA1, listenableFuture.get());
161     singleCallListener.waitForCall();
162 
163     assertTrue(singleCallListener.wasCalled());
164     assertTrue(listenableFuture.isDone());
165   }
166 
167   /** A Future that doesn't implement ListenableFuture, useful for testing listenInPoolThread. */
168   private static final class NonListenableSettableFuture<V> extends ForwardingFuture<V> {
create()169     static <V> NonListenableSettableFuture<V> create() {
170       return new NonListenableSettableFuture<V>();
171     }
172 
173     final SettableFuture<V> delegate = SettableFuture.create();
174 
175     @Override
delegate()176     protected Future<V> delegate() {
177       return delegate;
178     }
179 
set(V value)180     void set(V value) {
181       delegate.set(value);
182     }
183   }
184 
185   private static final class RuntimeExceptionThrowingFuture<V> implements Future<V> {
186     final CountDownLatch allowGetToComplete = new CountDownLatch(1);
187 
188     @Override
cancel(boolean mayInterruptIfRunning)189     public boolean cancel(boolean mayInterruptIfRunning) {
190       throw new AssertionFailedError();
191     }
192 
193     @Override
get()194     public V get() throws InterruptedException {
195       /*
196        * Wait a little to give us time to call addListener before the future's
197        * value is set in addition to the call we'll make after then.
198        */
199       allowGetToComplete.await(1, SECONDS);
200       throw new RuntimeException("expected, should be caught");
201     }
202 
203     @Override
get(long timeout, TimeUnit unit)204     public V get(long timeout, TimeUnit unit) {
205       throw new AssertionFailedError();
206     }
207 
208     @Override
isCancelled()209     public boolean isCancelled() {
210       throw new AssertionFailedError();
211     }
212 
213     @Override
isDone()214     public boolean isDone() {
215       /*
216        * If isDone is true during the call to listenInPoolThread,
217        * listenInPoolThread doesn't start a thread. Make sure it's false the
218        * first time through (and forever after, since no one else cares about
219        * it).
220        */
221       return false;
222     }
223   }
224 
225   private static final class RecordingRunnable implements Runnable {
226     final CountDownLatch wasRun = new CountDownLatch(1);
227 
228     // synchronized so that checkState works as expected.
229     @Override
run()230     public synchronized void run() {
231       checkState(wasRun.getCount() > 0);
232       wasRun.countDown();
233     }
234   }
235 
236   @SuppressWarnings("IsInstanceIncompatibleType") // intentional.
testListenInPoolThreadRunsListenerAfterRuntimeException()237   public void testListenInPoolThreadRunsListenerAfterRuntimeException() throws Exception {
238     RuntimeExceptionThrowingFuture<String> input = new RuntimeExceptionThrowingFuture<>();
239     /*
240      * The compiler recognizes that "input instanceof ListenableFuture" is
241      * impossible. We want the test, though, in case that changes in the future,
242      * so we use isInstance instead.
243      */
244     assertFalse(
245         "Can't test the main listenInPoolThread path "
246             + "if the input is already a ListenableFuture",
247         ListenableFuture.class.isInstance(input));
248     ListenableFuture<String> listenable = listenInPoolThread(input);
249     /*
250      * This will occur before the waiting get() in the
251      * listenInPoolThread-spawned thread completes:
252      */
253     RecordingRunnable earlyListener = new RecordingRunnable();
254     listenable.addListener(earlyListener, directExecutor());
255 
256     input.allowGetToComplete.countDown();
257     // Now give the get() thread time to finish:
258     assertTrue(earlyListener.wasRun.await(1, SECONDS));
259 
260     // Now test an additional addListener call, which will be run in-thread:
261     RecordingRunnable lateListener = new RecordingRunnable();
262     listenable.addListener(lateListener, directExecutor());
263     assertTrue(lateListener.wasRun.await(1, SECONDS));
264   }
265 
testAdapters_nullChecks()266   public void testAdapters_nullChecks() throws Exception {
267     new ClassSanityTester()
268         .forAllPublicStaticMethods(JdkFutureAdapters.class)
269         .thatReturn(Future.class)
270         .testNulls();
271   }
272 }
273