• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2009 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.base.Preconditions.checkState;
20 import static com.google.common.util.concurrent.Futures.immediateFuture;
21 import static com.google.common.util.concurrent.JdkFutureAdapters.listenInPoolThread;
22 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23 import static java.util.concurrent.Executors.newCachedThreadPool;
24 import static java.util.concurrent.TimeUnit.SECONDS;
25 
26 import com.google.common.testing.ClassSanityTester;
27 import com.google.common.util.concurrent.FuturesTest.ExecutorSpy;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.SynchronousQueue;
32 import java.util.concurrent.ThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34 import junit.framework.AssertionFailedError;
35 import junit.framework.TestCase;
36 
37 /**
38  * Unit tests for {@link JdkFutureAdapters}.
39  *
40  * @author Sven Mawson
41  * @author Kurt Alfred Kluever
42  */
43 public class JdkFutureAdaptersTest extends TestCase {
44   private static final String DATA1 = "data";
45 
testListenInPoolThreadReturnsSameFuture()46   public void testListenInPoolThreadReturnsSameFuture() throws Exception {
47     ListenableFuture<String> listenableFuture = immediateFuture(DATA1);
48     assertSame(listenableFuture, listenInPoolThread(listenableFuture));
49   }
50 
51   private static class SingleCallListener implements Runnable {
52 
53     private boolean expectCall = false;
54     private final CountDownLatch calledCountDown = new CountDownLatch(1);
55 
56     @Override
run()57     public void run() {
58       assertTrue("Listener called before it was expected", expectCall);
59       assertFalse("Listener called more than once", wasCalled());
60       calledCountDown.countDown();
61     }
62 
expectCall()63     public void expectCall() {
64       assertFalse("expectCall is already true", expectCall);
65       expectCall = true;
66     }
67 
wasCalled()68     public boolean wasCalled() {
69       return calledCountDown.getCount() == 0;
70     }
71 
waitForCall()72     public void waitForCall() throws InterruptedException {
73       assertTrue("expectCall is false", expectCall);
74       calledCountDown.await();
75     }
76   }
77 
testListenInPoolThreadIgnoresExecutorWhenDelegateIsDone()78   public void testListenInPoolThreadIgnoresExecutorWhenDelegateIsDone() throws Exception {
79     NonListenableSettableFuture<String> abstractFuture = NonListenableSettableFuture.create();
80     abstractFuture.set(DATA1);
81     ExecutorSpy spy = new ExecutorSpy(directExecutor());
82     ListenableFuture<String> listenableFuture = listenInPoolThread(abstractFuture, spy);
83 
84     SingleCallListener singleCallListener = new SingleCallListener();
85     singleCallListener.expectCall();
86 
87     assertFalse(spy.wasExecuted);
88     assertFalse(singleCallListener.wasCalled());
89     assertTrue(listenableFuture.isDone()); // We call AbstractFuture#set above.
90 
91     // #addListener() will run the listener immediately because the Future is
92     // already finished (we explicitly set the result of it above).
93     listenableFuture.addListener(singleCallListener, directExecutor());
94     assertEquals(DATA1, listenableFuture.get());
95 
96     // 'spy' should have been ignored since 'abstractFuture' was done before
97     // a listener was added.
98     assertFalse(spy.wasExecuted);
99     assertTrue(singleCallListener.wasCalled());
100     assertTrue(listenableFuture.isDone());
101   }
102 
103 
testListenInPoolThreadUsesGivenExecutor()104   public void testListenInPoolThreadUsesGivenExecutor() throws Exception {
105     ExecutorService executorService =
106         newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
107     NonListenableSettableFuture<String> abstractFuture = NonListenableSettableFuture.create();
108     ExecutorSpy spy = new ExecutorSpy(executorService);
109     ListenableFuture<String> listenableFuture = listenInPoolThread(abstractFuture, spy);
110 
111     SingleCallListener singleCallListener = new SingleCallListener();
112     singleCallListener.expectCall();
113 
114     assertFalse(spy.wasExecuted);
115     assertFalse(singleCallListener.wasCalled());
116     assertFalse(listenableFuture.isDone());
117 
118     listenableFuture.addListener(singleCallListener, executorService);
119     abstractFuture.set(DATA1);
120     assertEquals(DATA1, listenableFuture.get());
121     singleCallListener.waitForCall();
122 
123     assertTrue(spy.wasExecuted);
124     assertTrue(singleCallListener.wasCalled());
125     assertTrue(listenableFuture.isDone());
126   }
127 
128 
testListenInPoolThreadCustomExecutorInterrupted()129   public void testListenInPoolThreadCustomExecutorInterrupted() throws Exception {
130     final CountDownLatch submitSuccessful = new CountDownLatch(1);
131     ExecutorService executorService =
132         new ThreadPoolExecutor(
133             0,
134             Integer.MAX_VALUE,
135             60L,
136             TimeUnit.SECONDS,
137             new SynchronousQueue<Runnable>(),
138             new ThreadFactoryBuilder().setDaemon(true).build()) {
139           @Override
140           protected void beforeExecute(Thread t, Runnable r) {
141             submitSuccessful.countDown();
142           }
143         };
144     NonListenableSettableFuture<String> abstractFuture = NonListenableSettableFuture.create();
145     ListenableFuture<String> listenableFuture = listenInPoolThread(abstractFuture, executorService);
146 
147     SingleCallListener singleCallListener = new SingleCallListener();
148     singleCallListener.expectCall();
149 
150     assertFalse(singleCallListener.wasCalled());
151     assertFalse(listenableFuture.isDone());
152 
153     listenableFuture.addListener(singleCallListener, directExecutor());
154     /*
155      * Don't shut down until the listenInPoolThread task has been accepted to
156      * run. We want to see what happens when it's interrupted, not when it's
157      * rejected.
158      */
159     submitSuccessful.await();
160     executorService.shutdownNow();
161     abstractFuture.set(DATA1);
162     assertEquals(DATA1, listenableFuture.get());
163     singleCallListener.waitForCall();
164 
165     assertTrue(singleCallListener.wasCalled());
166     assertTrue(listenableFuture.isDone());
167   }
168 
169   /** A Future that doesn't implement ListenableFuture, useful for testing listenInPoolThread. */
170   private static final class NonListenableSettableFuture<V> extends ForwardingFuture<V> {
create()171     static <V> NonListenableSettableFuture<V> create() {
172       return new NonListenableSettableFuture<V>();
173     }
174 
175     final SettableFuture<V> delegate = SettableFuture.create();
176 
177     @Override
delegate()178     protected Future<V> delegate() {
179       return delegate;
180     }
181 
set(V value)182     void set(V value) {
183       delegate.set(value);
184     }
185   }
186 
187   private static final class RuntimeExceptionThrowingFuture<V> implements Future<V> {
188     final CountDownLatch allowGetToComplete = new CountDownLatch(1);
189 
190     @Override
cancel(boolean mayInterruptIfRunning)191     public boolean cancel(boolean mayInterruptIfRunning) {
192       throw new AssertionFailedError();
193     }
194 
195     @Override
get()196     public V get() throws InterruptedException {
197       /*
198        * Wait a little to give us time to call addListener before the future's
199        * value is set in addition to the call we'll make after then.
200        */
201       allowGetToComplete.await(1, SECONDS);
202       throw new RuntimeException("expected, should be caught");
203     }
204 
205     @Override
get(long timeout, TimeUnit unit)206     public V get(long timeout, TimeUnit unit) {
207       throw new AssertionFailedError();
208     }
209 
210     @Override
isCancelled()211     public boolean isCancelled() {
212       throw new AssertionFailedError();
213     }
214 
215     @Override
isDone()216     public boolean isDone() {
217       /*
218        * If isDone is true during the call to listenInPoolThread,
219        * listenInPoolThread doesn't start a thread. Make sure it's false the
220        * first time through (and forever after, since no one else cares about
221        * it).
222        */
223       return false;
224     }
225   }
226 
227   private static final class RecordingRunnable implements Runnable {
228     final CountDownLatch wasRun = new CountDownLatch(1);
229 
230     // synchronized so that checkState works as expected.
231     @Override
run()232     public synchronized void run() {
233       checkState(wasRun.getCount() > 0);
234       wasRun.countDown();
235     }
236   }
237 
238 
239   @SuppressWarnings("IsInstanceIncompatibleType") // intentional.
testListenInPoolThreadRunsListenerAfterRuntimeException()240   public void testListenInPoolThreadRunsListenerAfterRuntimeException() throws Exception {
241     RuntimeExceptionThrowingFuture<String> input = new RuntimeExceptionThrowingFuture<>();
242     /*
243      * The compiler recognizes that "input instanceof ListenableFuture" is
244      * impossible. We want the test, though, in case that changes in the future,
245      * so we use isInstance instead.
246      */
247     assertFalse(
248         "Can't test the main listenInPoolThread path "
249             + "if the input is already a ListenableFuture",
250         ListenableFuture.class.isInstance(input));
251     ListenableFuture<String> listenable = listenInPoolThread(input);
252     /*
253      * This will occur before the waiting get() in the
254      * listenInPoolThread-spawned thread completes:
255      */
256     RecordingRunnable earlyListener = new RecordingRunnable();
257     listenable.addListener(earlyListener, directExecutor());
258 
259     input.allowGetToComplete.countDown();
260     // Now give the get() thread time to finish:
261     assertTrue(earlyListener.wasRun.await(1, SECONDS));
262 
263     // Now test an additional addListener call, which will be run in-thread:
264     RecordingRunnable lateListener = new RecordingRunnable();
265     listenable.addListener(lateListener, directExecutor());
266     assertTrue(lateListener.wasRun.await(1, SECONDS));
267   }
268 
testAdapters_nullChecks()269   public void testAdapters_nullChecks() throws Exception {
270     new ClassSanityTester()
271         .forAllPublicStaticMethods(JdkFutureAdapters.class)
272         .thatReturn(Future.class)
273         .testNulls();
274   }
275 }
276