• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2008 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static com.google.common.util.concurrent.Futures.transformAsync;
21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
22 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
23 
24 import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
25 import java.util.concurrent.CancellationException;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutionException;
28 
29 /**
30  * Unit tests for {@link Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)}.
31  *
32  * @author Nishant Thakkar
33  */
34 public class FuturesTransformAsyncTest extends AbstractChainedListenableFutureTest<String> {
35   protected static final int SLOW_OUTPUT_VALID_INPUT_DATA = 2;
36   protected static final int SLOW_FUNC_VALID_INPUT_DATA = 3;
37   private static final String RESULT_DATA = "SUCCESS";
38 
39   private SettableFuture<String> outputFuture;
40   // Signals that the function is waiting to complete
41   private CountDownLatch funcIsWaitingLatch;
42   // Signals the function so it will complete
43   private CountDownLatch funcCompletionLatch;
44 
45   @Override
buildChainingFuture(ListenableFuture<Integer> inputFuture)46   protected ListenableFuture<String> buildChainingFuture(ListenableFuture<Integer> inputFuture) {
47     outputFuture = SettableFuture.create();
48     funcIsWaitingLatch = new CountDownLatch(1);
49     funcCompletionLatch = new CountDownLatch(1);
50     return transformAsync(inputFuture, new ChainingFunction(), directExecutor());
51   }
52 
53   @Override
getSuccessfulResult()54   protected String getSuccessfulResult() {
55     return RESULT_DATA;
56   }
57 
58   private class ChainingFunction implements AsyncFunction<Integer, String> {
59     @Override
apply(Integer input)60     public ListenableFuture<String> apply(Integer input) throws Exception {
61       switch (input) {
62         case VALID_INPUT_DATA:
63           outputFuture.set(RESULT_DATA);
64           break;
65         case SLOW_OUTPUT_VALID_INPUT_DATA:
66           break; // do nothing to the result
67         case SLOW_FUNC_VALID_INPUT_DATA:
68           funcIsWaitingLatch.countDown();
69           awaitUninterruptibly(funcCompletionLatch);
70           break;
71         case EXCEPTION_DATA:
72           throw EXCEPTION;
73       }
74       return outputFuture;
75     }
76   }
77 
testFutureGetThrowsFunctionException()78   public void testFutureGetThrowsFunctionException() throws Exception {
79     inputFuture.set(EXCEPTION_DATA);
80     listener.assertException(EXCEPTION);
81   }
82 
testFutureGetThrowsCancellationIfInputCancelled()83   public void testFutureGetThrowsCancellationIfInputCancelled() throws Exception {
84     inputFuture.cancel(true); // argument is ignored
85     try {
86       resultFuture.get();
87       fail("Result future must throw CancellationException" + " if input future is cancelled.");
88     } catch (CancellationException expected) {
89     }
90   }
91 
testFutureGetThrowsCancellationIfOutputCancelled()92   public void testFutureGetThrowsCancellationIfOutputCancelled() throws Exception {
93     inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
94     outputFuture.cancel(true); // argument is ignored
95     try {
96       resultFuture.get();
97       fail(
98           "Result future must throw CancellationException"
99               + " if function output future is cancelled.");
100     } catch (CancellationException expected) {
101     }
102   }
103 
testAsyncToString()104   public void testAsyncToString() throws Exception {
105     inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
106     assertThat(resultFuture.toString()).contains(outputFuture.toString());
107   }
108 
testFutureCancelBeforeInputCompletion()109   public void testFutureCancelBeforeInputCompletion() throws Exception {
110     assertTrue(resultFuture.cancel(true));
111     assertTrue(resultFuture.isCancelled());
112     assertTrue(inputFuture.isCancelled());
113     assertFalse(outputFuture.isCancelled());
114     try {
115       resultFuture.get();
116       fail("Result future is cancelled and should have thrown a" + " CancellationException");
117     } catch (CancellationException expected) {
118     }
119   }
120 
testFutureCancellableBeforeOutputCompletion()121   public void testFutureCancellableBeforeOutputCompletion() throws Exception {
122     inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
123     assertTrue(resultFuture.cancel(true));
124     assertTrue(resultFuture.isCancelled());
125     assertFalse(inputFuture.isCancelled());
126     assertTrue(outputFuture.isCancelled());
127     try {
128       resultFuture.get();
129       fail("Result future is cancelled and should have thrown a" + " CancellationException");
130     } catch (CancellationException expected) {
131     }
132   }
133 
134 
testFutureCancellableBeforeFunctionCompletion()135   public void testFutureCancellableBeforeFunctionCompletion() throws Exception {
136     // Set the result in a separate thread since this test runs the function
137     // (which will block) in the same thread.
138     new Thread() {
139       @Override
140       public void run() {
141         inputFuture.set(SLOW_FUNC_VALID_INPUT_DATA);
142       }
143     }.start();
144     funcIsWaitingLatch.await();
145 
146     assertTrue(resultFuture.cancel(true));
147     assertTrue(resultFuture.isCancelled());
148     assertFalse(inputFuture.isCancelled());
149     assertFalse(outputFuture.isCancelled());
150     try {
151       resultFuture.get();
152       fail("Result future is cancelled and should have thrown a" + " CancellationException");
153     } catch (CancellationException expected) {
154     }
155 
156     funcCompletionLatch.countDown(); // allow the function to complete
157     try {
158       outputFuture.get();
159       fail(
160           "The function output future is cancelled and should have thrown a"
161               + " CancellationException");
162     } catch (CancellationException expected) {
163     }
164   }
165 
testFutureCancelAfterCompletion()166   public void testFutureCancelAfterCompletion() throws Exception {
167     inputFuture.set(VALID_INPUT_DATA);
168     assertFalse(resultFuture.cancel(true));
169     assertFalse(resultFuture.isCancelled());
170     assertFalse(inputFuture.isCancelled());
171     assertFalse(outputFuture.isCancelled());
172     assertEquals(RESULT_DATA, resultFuture.get());
173   }
174 
testFutureGetThrowsRuntimeException()175   public void testFutureGetThrowsRuntimeException() throws Exception {
176     BadFuture badInput = new BadFuture(Futures.immediateFuture(20));
177     ListenableFuture<String> chain = buildChainingFuture(badInput);
178     try {
179       chain.get();
180       fail("Future.get must throw an exception when the input future fails.");
181     } catch (ExecutionException e) {
182       assertSame(RuntimeException.class, e.getCause().getClass());
183     }
184   }
185 
186   /** Proxy to throw a {@link RuntimeException} out of the {@link #get()} method. */
187   public static class BadFuture extends SimpleForwardingListenableFuture<Integer> {
BadFuture(ListenableFuture<Integer> delegate)188     protected BadFuture(ListenableFuture<Integer> delegate) {
189       super(delegate);
190     }
191 
192     @Override
get()193     public Integer get() {
194       throw new RuntimeException("Oops");
195     }
196   }
197 }
198