1 /* 2 * Copyright (C) 2008 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static com.google.common.util.concurrent.Futures.transformAsync; 21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 22 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; 23 24 import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture; 25 import java.util.concurrent.CancellationException; 26 import java.util.concurrent.CountDownLatch; 27 import java.util.concurrent.ExecutionException; 28 29 /** 30 * Unit tests for {@link Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)}. 31 * 32 * @author Nishant Thakkar 33 */ 34 public class FuturesTransformAsyncTest extends AbstractChainedListenableFutureTest<String> { 35 protected static final int SLOW_OUTPUT_VALID_INPUT_DATA = 2; 36 protected static final int SLOW_FUNC_VALID_INPUT_DATA = 3; 37 private static final String RESULT_DATA = "SUCCESS"; 38 39 private SettableFuture<String> outputFuture; 40 // Signals that the function is waiting to complete 41 private CountDownLatch funcIsWaitingLatch; 42 // Signals the function so it will complete 43 private CountDownLatch funcCompletionLatch; 44 45 @Override buildChainingFuture(ListenableFuture<Integer> inputFuture)46 protected ListenableFuture<String> buildChainingFuture(ListenableFuture<Integer> inputFuture) { 47 outputFuture = SettableFuture.create(); 48 funcIsWaitingLatch = new CountDownLatch(1); 49 funcCompletionLatch = new CountDownLatch(1); 50 return transformAsync(inputFuture, new ChainingFunction(), directExecutor()); 51 } 52 53 @Override getSuccessfulResult()54 protected String getSuccessfulResult() { 55 return RESULT_DATA; 56 } 57 58 private class ChainingFunction implements AsyncFunction<Integer, String> { 59 @Override apply(Integer input)60 public ListenableFuture<String> apply(Integer input) throws Exception { 61 switch (input) { 62 case VALID_INPUT_DATA: 63 outputFuture.set(RESULT_DATA); 64 break; 65 case SLOW_OUTPUT_VALID_INPUT_DATA: 66 break; // do nothing to the result 67 case SLOW_FUNC_VALID_INPUT_DATA: 68 funcIsWaitingLatch.countDown(); 69 awaitUninterruptibly(funcCompletionLatch); 70 break; 71 case EXCEPTION_DATA: 72 throw EXCEPTION; 73 } 74 return outputFuture; 75 } 76 } 77 testFutureGetThrowsFunctionException()78 public void testFutureGetThrowsFunctionException() throws Exception { 79 inputFuture.set(EXCEPTION_DATA); 80 listener.assertException(EXCEPTION); 81 } 82 testFutureGetThrowsCancellationIfInputCancelled()83 public void testFutureGetThrowsCancellationIfInputCancelled() throws Exception { 84 inputFuture.cancel(true); // argument is ignored 85 try { 86 resultFuture.get(); 87 fail("Result future must throw CancellationException" + " if input future is cancelled."); 88 } catch (CancellationException expected) { 89 } 90 } 91 testFutureGetThrowsCancellationIfOutputCancelled()92 public void testFutureGetThrowsCancellationIfOutputCancelled() throws Exception { 93 inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA); 94 outputFuture.cancel(true); // argument is ignored 95 try { 96 resultFuture.get(); 97 fail( 98 "Result future must throw CancellationException" 99 + " if function output future is cancelled."); 100 } catch (CancellationException expected) { 101 } 102 } 103 testAsyncToString()104 public void testAsyncToString() throws Exception { 105 inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA); 106 assertThat(resultFuture.toString()).contains(outputFuture.toString()); 107 } 108 testFutureCancelBeforeInputCompletion()109 public void testFutureCancelBeforeInputCompletion() throws Exception { 110 assertTrue(resultFuture.cancel(true)); 111 assertTrue(resultFuture.isCancelled()); 112 assertTrue(inputFuture.isCancelled()); 113 assertFalse(outputFuture.isCancelled()); 114 try { 115 resultFuture.get(); 116 fail("Result future is cancelled and should have thrown a" + " CancellationException"); 117 } catch (CancellationException expected) { 118 } 119 } 120 testFutureCancellableBeforeOutputCompletion()121 public void testFutureCancellableBeforeOutputCompletion() throws Exception { 122 inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA); 123 assertTrue(resultFuture.cancel(true)); 124 assertTrue(resultFuture.isCancelled()); 125 assertFalse(inputFuture.isCancelled()); 126 assertTrue(outputFuture.isCancelled()); 127 try { 128 resultFuture.get(); 129 fail("Result future is cancelled and should have thrown a" + " CancellationException"); 130 } catch (CancellationException expected) { 131 } 132 } 133 testFutureCancellableBeforeFunctionCompletion()134 public void testFutureCancellableBeforeFunctionCompletion() throws Exception { 135 // Set the result in a separate thread since this test runs the function 136 // (which will block) in the same thread. 137 new Thread() { 138 @Override 139 public void run() { 140 inputFuture.set(SLOW_FUNC_VALID_INPUT_DATA); 141 } 142 }.start(); 143 funcIsWaitingLatch.await(); 144 145 assertTrue(resultFuture.cancel(true)); 146 assertTrue(resultFuture.isCancelled()); 147 assertFalse(inputFuture.isCancelled()); 148 assertFalse(outputFuture.isCancelled()); 149 try { 150 resultFuture.get(); 151 fail("Result future is cancelled and should have thrown a" + " CancellationException"); 152 } catch (CancellationException expected) { 153 } 154 155 funcCompletionLatch.countDown(); // allow the function to complete 156 try { 157 outputFuture.get(); 158 fail( 159 "The function output future is cancelled and should have thrown a" 160 + " CancellationException"); 161 } catch (CancellationException expected) { 162 } 163 } 164 testFutureCancelAfterCompletion()165 public void testFutureCancelAfterCompletion() throws Exception { 166 inputFuture.set(VALID_INPUT_DATA); 167 assertFalse(resultFuture.cancel(true)); 168 assertFalse(resultFuture.isCancelled()); 169 assertFalse(inputFuture.isCancelled()); 170 assertFalse(outputFuture.isCancelled()); 171 assertEquals(RESULT_DATA, resultFuture.get()); 172 } 173 testFutureGetThrowsRuntimeException()174 public void testFutureGetThrowsRuntimeException() throws Exception { 175 BadFuture badInput = new BadFuture(Futures.immediateFuture(20)); 176 ListenableFuture<String> chain = buildChainingFuture(badInput); 177 try { 178 chain.get(); 179 fail("Future.get must throw an exception when the input future fails."); 180 } catch (ExecutionException e) { 181 assertSame(RuntimeException.class, e.getCause().getClass()); 182 } 183 } 184 185 /** Proxy to throw a {@link RuntimeException} out of the {@link #get()} method. */ 186 public static class BadFuture extends SimpleForwardingListenableFuture<Integer> { BadFuture(ListenableFuture<Integer> delegate)187 protected BadFuture(ListenableFuture<Integer> delegate) { 188 super(delegate); 189 } 190 191 @Override get()192 public Integer get() { 193 throw new RuntimeException("Oops"); 194 } 195 } 196 } 197