• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2008 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static com.google.common.util.concurrent.Futures.transformAsync;
21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
22 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
23 
24 import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
25 import java.util.concurrent.CancellationException;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutionException;
28 
29 /**
30  * Unit tests for {@link Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)}.
31  *
32  * @author Nishant Thakkar
33  */
34 public class FuturesTransformAsyncTest extends AbstractChainedListenableFutureTest<String> {
35   protected static final int SLOW_OUTPUT_VALID_INPUT_DATA = 2;
36   protected static final int SLOW_FUNC_VALID_INPUT_DATA = 3;
37   private static final String RESULT_DATA = "SUCCESS";
38 
39   private SettableFuture<String> outputFuture;
40   // Signals that the function is waiting to complete
41   private CountDownLatch funcIsWaitingLatch;
42   // Signals the function so it will complete
43   private CountDownLatch funcCompletionLatch;
44 
45   @Override
buildChainingFuture(ListenableFuture<Integer> inputFuture)46   protected ListenableFuture<String> buildChainingFuture(ListenableFuture<Integer> inputFuture) {
47     outputFuture = SettableFuture.create();
48     funcIsWaitingLatch = new CountDownLatch(1);
49     funcCompletionLatch = new CountDownLatch(1);
50     return transformAsync(inputFuture, new ChainingFunction(), directExecutor());
51   }
52 
53   @Override
getSuccessfulResult()54   protected String getSuccessfulResult() {
55     return RESULT_DATA;
56   }
57 
58   private class ChainingFunction implements AsyncFunction<Integer, String> {
59     @Override
apply(Integer input)60     public ListenableFuture<String> apply(Integer input) throws Exception {
61       switch (input) {
62         case VALID_INPUT_DATA:
63           outputFuture.set(RESULT_DATA);
64           break;
65         case SLOW_OUTPUT_VALID_INPUT_DATA:
66           break; // do nothing to the result
67         case SLOW_FUNC_VALID_INPUT_DATA:
68           funcIsWaitingLatch.countDown();
69           awaitUninterruptibly(funcCompletionLatch);
70           break;
71         case EXCEPTION_DATA:
72           throw EXCEPTION;
73       }
74       return outputFuture;
75     }
76   }
77 
testFutureGetThrowsFunctionException()78   public void testFutureGetThrowsFunctionException() throws Exception {
79     inputFuture.set(EXCEPTION_DATA);
80     listener.assertException(EXCEPTION);
81   }
82 
testFutureGetThrowsCancellationIfInputCancelled()83   public void testFutureGetThrowsCancellationIfInputCancelled() throws Exception {
84     inputFuture.cancel(true); // argument is ignored
85     try {
86       resultFuture.get();
87       fail("Result future must throw CancellationException" + " if input future is cancelled.");
88     } catch (CancellationException expected) {
89     }
90   }
91 
testFutureGetThrowsCancellationIfOutputCancelled()92   public void testFutureGetThrowsCancellationIfOutputCancelled() throws Exception {
93     inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
94     outputFuture.cancel(true); // argument is ignored
95     try {
96       resultFuture.get();
97       fail(
98           "Result future must throw CancellationException"
99               + " if function output future is cancelled.");
100     } catch (CancellationException expected) {
101     }
102   }
103 
testAsyncToString()104   public void testAsyncToString() throws Exception {
105     inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
106     assertThat(resultFuture.toString()).contains(outputFuture.toString());
107   }
108 
testFutureCancelBeforeInputCompletion()109   public void testFutureCancelBeforeInputCompletion() throws Exception {
110     assertTrue(resultFuture.cancel(true));
111     assertTrue(resultFuture.isCancelled());
112     assertTrue(inputFuture.isCancelled());
113     assertFalse(outputFuture.isCancelled());
114     try {
115       resultFuture.get();
116       fail("Result future is cancelled and should have thrown a" + " CancellationException");
117     } catch (CancellationException expected) {
118     }
119   }
120 
testFutureCancellableBeforeOutputCompletion()121   public void testFutureCancellableBeforeOutputCompletion() throws Exception {
122     inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
123     assertTrue(resultFuture.cancel(true));
124     assertTrue(resultFuture.isCancelled());
125     assertFalse(inputFuture.isCancelled());
126     assertTrue(outputFuture.isCancelled());
127     try {
128       resultFuture.get();
129       fail("Result future is cancelled and should have thrown a" + " CancellationException");
130     } catch (CancellationException expected) {
131     }
132   }
133 
testFutureCancellableBeforeFunctionCompletion()134   public void testFutureCancellableBeforeFunctionCompletion() throws Exception {
135     // Set the result in a separate thread since this test runs the function
136     // (which will block) in the same thread.
137     new Thread() {
138       @Override
139       public void run() {
140         inputFuture.set(SLOW_FUNC_VALID_INPUT_DATA);
141       }
142     }.start();
143     funcIsWaitingLatch.await();
144 
145     assertTrue(resultFuture.cancel(true));
146     assertTrue(resultFuture.isCancelled());
147     assertFalse(inputFuture.isCancelled());
148     assertFalse(outputFuture.isCancelled());
149     try {
150       resultFuture.get();
151       fail("Result future is cancelled and should have thrown a" + " CancellationException");
152     } catch (CancellationException expected) {
153     }
154 
155     funcCompletionLatch.countDown(); // allow the function to complete
156     try {
157       outputFuture.get();
158       fail(
159           "The function output future is cancelled and should have thrown a"
160               + " CancellationException");
161     } catch (CancellationException expected) {
162     }
163   }
164 
testFutureCancelAfterCompletion()165   public void testFutureCancelAfterCompletion() throws Exception {
166     inputFuture.set(VALID_INPUT_DATA);
167     assertFalse(resultFuture.cancel(true));
168     assertFalse(resultFuture.isCancelled());
169     assertFalse(inputFuture.isCancelled());
170     assertFalse(outputFuture.isCancelled());
171     assertEquals(RESULT_DATA, resultFuture.get());
172   }
173 
testFutureGetThrowsRuntimeException()174   public void testFutureGetThrowsRuntimeException() throws Exception {
175     BadFuture badInput = new BadFuture(Futures.immediateFuture(20));
176     ListenableFuture<String> chain = buildChainingFuture(badInput);
177     try {
178       chain.get();
179       fail("Future.get must throw an exception when the input future fails.");
180     } catch (ExecutionException e) {
181       assertSame(RuntimeException.class, e.getCause().getClass());
182     }
183   }
184 
185   /** Proxy to throw a {@link RuntimeException} out of the {@link #get()} method. */
186   public static class BadFuture extends SimpleForwardingListenableFuture<Integer> {
BadFuture(ListenableFuture<Integer> delegate)187     protected BadFuture(ListenableFuture<Integer> delegate) {
188       super(delegate);
189     }
190 
191     @Override
get()192     public Integer get() {
193       throw new RuntimeException("Oops");
194     }
195   }
196 }
197