• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2008 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
20 
21 import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
22 
23 import java.lang.reflect.UndeclaredThrowableException;
24 import java.util.concurrent.CancellationException;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutionException;
27 
28 /**
29  * Unit tests for {@link Futures#transform(ListenableFuture, AsyncFunction)}.
30  *
31  * @author Nishant Thakkar
32  */
33 public class FuturesTransformAsyncFunctionTest
34     extends AbstractChainedListenableFutureTest<String> {
35   protected static final int SLOW_OUTPUT_VALID_INPUT_DATA = 2;
36   protected static final int SLOW_FUNC_VALID_INPUT_DATA = 3;
37   private static final String RESULT_DATA = "SUCCESS";
38 
39   private SettableFuture<String> outputFuture;
40   // Signals that the function is waiting to complete
41   private CountDownLatch funcIsWaitingLatch;
42   // Signals the function so it will complete
43   private CountDownLatch funcCompletionLatch;
44 
buildChainingFuture( ListenableFuture<Integer> inputFuture)45   @Override protected ListenableFuture<String> buildChainingFuture(
46       ListenableFuture<Integer> inputFuture) {
47     outputFuture = SettableFuture.create();
48     funcIsWaitingLatch = new CountDownLatch(1);
49     funcCompletionLatch = new CountDownLatch(1);
50     return Futures.transform(inputFuture, new ChainingFunction());
51   }
52 
getSuccessfulResult()53   @Override protected String getSuccessfulResult() {
54     return RESULT_DATA;
55   }
56 
57   private class ChainingFunction implements AsyncFunction<Integer, String> {
58     @Override
apply(Integer input)59     public ListenableFuture<String> apply(Integer input) {
60       switch (input) {
61         case VALID_INPUT_DATA: outputFuture.set(RESULT_DATA); break;
62         case SLOW_OUTPUT_VALID_INPUT_DATA: break;  // do nothing to the result
63         case SLOW_FUNC_VALID_INPUT_DATA:
64           funcIsWaitingLatch.countDown();
65           awaitUninterruptibly(funcCompletionLatch);
66           break;
67         default: throw new UndeclaredThrowableException(EXCEPTION);
68       }
69       return outputFuture;
70     }
71   }
72 
testFutureGetThrowsFunctionException()73   public void testFutureGetThrowsFunctionException() throws Exception {
74     inputFuture.set(EXCEPTION_DATA);
75     listener.assertException(EXCEPTION);
76   }
77 
testFutureGetThrowsCancellationIfInputCancelled()78   public void testFutureGetThrowsCancellationIfInputCancelled()
79       throws Exception {
80     inputFuture.cancel(true); // argument is ignored
81     try {
82       resultFuture.get();
83       fail("Result future must throw CancellationException"
84           + " if input future is cancelled.");
85     } catch (CancellationException expected) {}
86   }
87 
testFutureGetThrowsCancellationIfOutputCancelled()88   public void testFutureGetThrowsCancellationIfOutputCancelled()
89       throws Exception {
90     inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
91     outputFuture.cancel(true); // argument is ignored
92     try {
93       resultFuture.get();
94       fail("Result future must throw CancellationException"
95           + " if function output future is cancelled.");
96     } catch (CancellationException expected) {}
97   }
98 
testFutureCancelBeforeInputCompletion()99   public void testFutureCancelBeforeInputCompletion() throws Exception {
100     assertTrue(resultFuture.cancel(true));
101     assertTrue(resultFuture.isCancelled());
102     assertTrue(inputFuture.isCancelled());
103     assertFalse(outputFuture.isCancelled());
104     try {
105       resultFuture.get();
106       fail("Result future is cancelled and should have thrown a"
107           + " CancellationException");
108     } catch (CancellationException expected) {}
109   }
110 
testFutureCancellableBeforeOutputCompletion()111   public void testFutureCancellableBeforeOutputCompletion() throws Exception {
112     inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
113     assertTrue(resultFuture.cancel(true));
114     assertTrue(resultFuture.isCancelled());
115     assertFalse(inputFuture.isCancelled());
116     assertTrue(outputFuture.isCancelled());
117     try {
118       resultFuture.get();
119       fail("Result future is cancelled and should have thrown a"
120           + " CancellationException");
121     } catch (CancellationException expected) {}
122   }
123 
testFutureCancellableBeforeFunctionCompletion()124   public void testFutureCancellableBeforeFunctionCompletion() throws Exception {
125     // Set the result in a separate thread since this test runs the function
126     // (which will block) in the same thread.
127     new Thread() {
128       @Override
129       public void run() {
130         inputFuture.set(SLOW_FUNC_VALID_INPUT_DATA);
131       }
132     }.start();
133     funcIsWaitingLatch.await();
134 
135     assertTrue(resultFuture.cancel(true));
136     assertTrue(resultFuture.isCancelled());
137     assertFalse(inputFuture.isCancelled());
138     assertFalse(outputFuture.isCancelled());
139     try {
140       resultFuture.get();
141       fail("Result future is cancelled and should have thrown a"
142           + " CancellationException");
143     } catch (CancellationException expected) {}
144 
145     funcCompletionLatch.countDown();  // allow the function to complete
146     try {
147       outputFuture.get();
148       fail("The function output future is cancelled and should have thrown a"
149           + " CancellationException");
150     } catch (CancellationException expected) {}
151   }
152 
testFutureCancelAfterCompletion()153   public void testFutureCancelAfterCompletion() throws Exception {
154     inputFuture.set(VALID_INPUT_DATA);
155     assertFalse(resultFuture.cancel(true));
156     assertFalse(resultFuture.isCancelled());
157     assertFalse(inputFuture.isCancelled());
158     assertFalse(outputFuture.isCancelled());
159     assertEquals(RESULT_DATA, resultFuture.get());
160   }
161 
testFutureGetThrowsRuntimeException()162   public void testFutureGetThrowsRuntimeException() throws Exception {
163     BadFuture badInput = new BadFuture(Futures.immediateFuture(20));
164     ListenableFuture<String> chain = buildChainingFuture(badInput);
165     try {
166       chain.get();
167       fail("Future.get must throw an exception when the input future fails.");
168     } catch (ExecutionException e) {
169       assertSame(RuntimeException.class, e.getCause().getClass());
170     }
171   }
172 
173   /**
174    * Proxy to throw a {@link RuntimeException} out of the {@link #get()} method.
175    */
176   public static class BadFuture
177       extends SimpleForwardingListenableFuture<Integer> {
BadFuture(ListenableFuture<Integer> delegate)178     protected BadFuture(ListenableFuture<Integer> delegate) {
179       super(delegate);
180     }
181 
182     @Override
get()183     public Integer get() {
184       throw new RuntimeException("Oops");
185     }
186   }
187 }
188