1 /* 2 * Copyright (C) 2008 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static com.google.common.util.concurrent.Futures.transformAsync; 21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 22 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; 23 24 import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture; 25 import java.util.concurrent.CancellationException; 26 import java.util.concurrent.CountDownLatch; 27 import java.util.concurrent.ExecutionException; 28 29 /** 30 * Unit tests for {@link Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)}. 31 * 32 * @author Nishant Thakkar 33 */ 34 public class FuturesTransformAsyncTest extends AbstractChainedListenableFutureTest<String> { 35 protected static final int SLOW_OUTPUT_VALID_INPUT_DATA = 2; 36 protected static final int SLOW_FUNC_VALID_INPUT_DATA = 3; 37 private static final String RESULT_DATA = "SUCCESS"; 38 39 private SettableFuture<String> outputFuture; 40 // Signals that the function is waiting to complete 41 private CountDownLatch funcIsWaitingLatch; 42 // Signals the function so it will complete 43 private CountDownLatch funcCompletionLatch; 44 45 @Override buildChainingFuture(ListenableFuture<Integer> inputFuture)46 protected ListenableFuture<String> buildChainingFuture(ListenableFuture<Integer> inputFuture) { 47 outputFuture = SettableFuture.create(); 48 funcIsWaitingLatch = new CountDownLatch(1); 49 funcCompletionLatch = new CountDownLatch(1); 50 return transformAsync(inputFuture, new ChainingFunction(), directExecutor()); 51 } 52 53 @Override getSuccessfulResult()54 protected String getSuccessfulResult() { 55 return RESULT_DATA; 56 } 57 58 private class ChainingFunction implements AsyncFunction<Integer, String> { 59 @Override apply(Integer input)60 public ListenableFuture<String> apply(Integer input) throws Exception { 61 switch (input) { 62 case VALID_INPUT_DATA: 63 outputFuture.set(RESULT_DATA); 64 break; 65 case SLOW_OUTPUT_VALID_INPUT_DATA: 66 break; // do nothing to the result 67 case SLOW_FUNC_VALID_INPUT_DATA: 68 funcIsWaitingLatch.countDown(); 69 awaitUninterruptibly(funcCompletionLatch); 70 break; 71 case EXCEPTION_DATA: 72 throw EXCEPTION; 73 } 74 return outputFuture; 75 } 76 } 77 testFutureGetThrowsFunctionException()78 public void testFutureGetThrowsFunctionException() throws Exception { 79 inputFuture.set(EXCEPTION_DATA); 80 listener.assertException(EXCEPTION); 81 } 82 testFutureGetThrowsCancellationIfInputCancelled()83 public void testFutureGetThrowsCancellationIfInputCancelled() throws Exception { 84 inputFuture.cancel(true); // argument is ignored 85 try { 86 resultFuture.get(); 87 fail("Result future must throw CancellationException" + " if input future is cancelled."); 88 } catch (CancellationException expected) { 89 } 90 } 91 testFutureGetThrowsCancellationIfOutputCancelled()92 public void testFutureGetThrowsCancellationIfOutputCancelled() throws Exception { 93 inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA); 94 outputFuture.cancel(true); // argument is ignored 95 try { 96 resultFuture.get(); 97 fail( 98 "Result future must throw CancellationException" 99 + " if function output future is cancelled."); 100 } catch (CancellationException expected) { 101 } 102 } 103 testAsyncToString()104 public void testAsyncToString() throws Exception { 105 inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA); 106 assertThat(resultFuture.toString()).contains(outputFuture.toString()); 107 } 108 testFutureCancelBeforeInputCompletion()109 public void testFutureCancelBeforeInputCompletion() throws Exception { 110 assertTrue(resultFuture.cancel(true)); 111 assertTrue(resultFuture.isCancelled()); 112 assertTrue(inputFuture.isCancelled()); 113 assertFalse(outputFuture.isCancelled()); 114 try { 115 resultFuture.get(); 116 fail("Result future is cancelled and should have thrown a" + " CancellationException"); 117 } catch (CancellationException expected) { 118 } 119 } 120 testFutureCancellableBeforeOutputCompletion()121 public void testFutureCancellableBeforeOutputCompletion() throws Exception { 122 inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA); 123 assertTrue(resultFuture.cancel(true)); 124 assertTrue(resultFuture.isCancelled()); 125 assertFalse(inputFuture.isCancelled()); 126 assertTrue(outputFuture.isCancelled()); 127 try { 128 resultFuture.get(); 129 fail("Result future is cancelled and should have thrown a" + " CancellationException"); 130 } catch (CancellationException expected) { 131 } 132 } 133 134 testFutureCancellableBeforeFunctionCompletion()135 public void testFutureCancellableBeforeFunctionCompletion() throws Exception { 136 // Set the result in a separate thread since this test runs the function 137 // (which will block) in the same thread. 138 new Thread() { 139 @Override 140 public void run() { 141 inputFuture.set(SLOW_FUNC_VALID_INPUT_DATA); 142 } 143 }.start(); 144 funcIsWaitingLatch.await(); 145 146 assertTrue(resultFuture.cancel(true)); 147 assertTrue(resultFuture.isCancelled()); 148 assertFalse(inputFuture.isCancelled()); 149 assertFalse(outputFuture.isCancelled()); 150 try { 151 resultFuture.get(); 152 fail("Result future is cancelled and should have thrown a" + " CancellationException"); 153 } catch (CancellationException expected) { 154 } 155 156 funcCompletionLatch.countDown(); // allow the function to complete 157 try { 158 outputFuture.get(); 159 fail( 160 "The function output future is cancelled and should have thrown a" 161 + " CancellationException"); 162 } catch (CancellationException expected) { 163 } 164 } 165 testFutureCancelAfterCompletion()166 public void testFutureCancelAfterCompletion() throws Exception { 167 inputFuture.set(VALID_INPUT_DATA); 168 assertFalse(resultFuture.cancel(true)); 169 assertFalse(resultFuture.isCancelled()); 170 assertFalse(inputFuture.isCancelled()); 171 assertFalse(outputFuture.isCancelled()); 172 assertEquals(RESULT_DATA, resultFuture.get()); 173 } 174 testFutureGetThrowsRuntimeException()175 public void testFutureGetThrowsRuntimeException() throws Exception { 176 BadFuture badInput = new BadFuture(Futures.immediateFuture(20)); 177 ListenableFuture<String> chain = buildChainingFuture(badInput); 178 try { 179 chain.get(); 180 fail("Future.get must throw an exception when the input future fails."); 181 } catch (ExecutionException e) { 182 assertSame(RuntimeException.class, e.getCause().getClass()); 183 } 184 } 185 186 /** Proxy to throw a {@link RuntimeException} out of the {@link #get()} method. */ 187 public static class BadFuture extends SimpleForwardingListenableFuture<Integer> { BadFuture(ListenableFuture<Integer> delegate)188 protected BadFuture(ListenableFuture<Integer> delegate) { 189 super(delegate); 190 } 191 192 @Override get()193 public Integer get() { 194 throw new RuntimeException("Oops"); 195 } 196 } 197 } 198