• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2008 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static com.google.common.util.concurrent.Futures.transformAsync;
21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
22 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
23 import static org.junit.Assert.assertThrows;
24 
25 import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
26 import java.util.concurrent.CancellationException;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 
30 /**
31  * Unit tests for {@link Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)}.
32  *
33  * @author Nishant Thakkar
34  */
35 public class FuturesTransformAsyncTest extends AbstractChainedListenableFutureTest<String> {
36   protected static final int SLOW_OUTPUT_VALID_INPUT_DATA = 2;
37   protected static final int SLOW_FUNC_VALID_INPUT_DATA = 3;
38   private static final String RESULT_DATA = "SUCCESS";
39 
40   private SettableFuture<String> outputFuture;
41   // Signals that the function is waiting to complete
42   private CountDownLatch funcIsWaitingLatch;
43   // Signals the function so it will complete
44   private CountDownLatch funcCompletionLatch;
45 
46   @Override
buildChainingFuture(ListenableFuture<Integer> inputFuture)47   protected ListenableFuture<String> buildChainingFuture(ListenableFuture<Integer> inputFuture) {
48     outputFuture = SettableFuture.create();
49     funcIsWaitingLatch = new CountDownLatch(1);
50     funcCompletionLatch = new CountDownLatch(1);
51     return transformAsync(inputFuture, new ChainingFunction(), directExecutor());
52   }
53 
54   @Override
getSuccessfulResult()55   protected String getSuccessfulResult() {
56     return RESULT_DATA;
57   }
58 
59   private class ChainingFunction implements AsyncFunction<Integer, String> {
60     @Override
apply(Integer input)61     public ListenableFuture<String> apply(Integer input) throws Exception {
62       switch (input) {
63         case VALID_INPUT_DATA:
64           outputFuture.set(RESULT_DATA);
65           break;
66         case SLOW_OUTPUT_VALID_INPUT_DATA:
67           break; // do nothing to the result
68         case SLOW_FUNC_VALID_INPUT_DATA:
69           funcIsWaitingLatch.countDown();
70           awaitUninterruptibly(funcCompletionLatch);
71           break;
72         case EXCEPTION_DATA:
73           throw EXCEPTION;
74       }
75       return outputFuture;
76     }
77   }
78 
testFutureGetThrowsFunctionException()79   public void testFutureGetThrowsFunctionException() throws Exception {
80     inputFuture.set(EXCEPTION_DATA);
81     listener.assertException(EXCEPTION);
82   }
83 
testFutureGetThrowsCancellationIfInputCancelled()84   public void testFutureGetThrowsCancellationIfInputCancelled() throws Exception {
85     inputFuture.cancel(true); // argument is ignored
86     assertThrows(CancellationException.class, () -> resultFuture.get());
87   }
88 
testFutureGetThrowsCancellationIfOutputCancelled()89   public void testFutureGetThrowsCancellationIfOutputCancelled() throws Exception {
90     inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
91     outputFuture.cancel(true); // argument is ignored
92     assertThrows(CancellationException.class, () -> resultFuture.get());
93   }
94 
testAsyncToString()95   public void testAsyncToString() throws Exception {
96     inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
97     assertThat(resultFuture.toString()).contains(outputFuture.toString());
98   }
99 
testFutureCancelBeforeInputCompletion()100   public void testFutureCancelBeforeInputCompletion() throws Exception {
101     assertTrue(resultFuture.cancel(true));
102     assertTrue(resultFuture.isCancelled());
103     assertTrue(inputFuture.isCancelled());
104     assertFalse(outputFuture.isCancelled());
105     assertThrows(CancellationException.class, () -> resultFuture.get());
106   }
107 
testFutureCancellableBeforeOutputCompletion()108   public void testFutureCancellableBeforeOutputCompletion() throws Exception {
109     inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
110     assertTrue(resultFuture.cancel(true));
111     assertTrue(resultFuture.isCancelled());
112     assertFalse(inputFuture.isCancelled());
113     assertTrue(outputFuture.isCancelled());
114     assertThrows(CancellationException.class, () -> resultFuture.get());
115   }
116 
testFutureCancellableBeforeFunctionCompletion()117   public void testFutureCancellableBeforeFunctionCompletion() throws Exception {
118     // Set the result in a separate thread since this test runs the function
119     // (which will block) in the same thread.
120     new Thread() {
121       @Override
122       public void run() {
123         inputFuture.set(SLOW_FUNC_VALID_INPUT_DATA);
124       }
125     }.start();
126     funcIsWaitingLatch.await();
127 
128     assertTrue(resultFuture.cancel(true));
129     assertTrue(resultFuture.isCancelled());
130     assertFalse(inputFuture.isCancelled());
131     assertFalse(outputFuture.isCancelled());
132     assertThrows(CancellationException.class, () -> resultFuture.get());
133 
134     funcCompletionLatch.countDown(); // allow the function to complete
135     assertThrows(CancellationException.class, () -> outputFuture.get());
136   }
137 
testFutureCancelAfterCompletion()138   public void testFutureCancelAfterCompletion() throws Exception {
139     inputFuture.set(VALID_INPUT_DATA);
140     assertFalse(resultFuture.cancel(true));
141     assertFalse(resultFuture.isCancelled());
142     assertFalse(inputFuture.isCancelled());
143     assertFalse(outputFuture.isCancelled());
144     assertEquals(RESULT_DATA, resultFuture.get());
145   }
146 
testFutureGetThrowsRuntimeException()147   public void testFutureGetThrowsRuntimeException() throws Exception {
148     BadFuture badInput = new BadFuture(Futures.immediateFuture(20));
149     ListenableFuture<String> chain = buildChainingFuture(badInput);
150     ExecutionException e = assertThrows(ExecutionException.class, () -> chain.get());
151     assertSame(RuntimeException.class, e.getCause().getClass());
152   }
153 
154   /** Proxy to throw a {@link RuntimeException} out of the {@link #get()} method. */
155   public static class BadFuture extends SimpleForwardingListenableFuture<Integer> {
BadFuture(ListenableFuture<Integer> delegate)156     protected BadFuture(ListenableFuture<Integer> delegate) {
157       super(delegate);
158     }
159 
160     @Override
get()161     public Integer get() {
162       throw new RuntimeException("Oops");
163     }
164   }
165 }
166