1 /* 2 * Copyright (C) 2008 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static com.google.common.util.concurrent.Futures.transformAsync; 21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 22 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; 23 import static org.junit.Assert.assertThrows; 24 25 import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture; 26 import java.util.concurrent.CancellationException; 27 import java.util.concurrent.CountDownLatch; 28 import java.util.concurrent.ExecutionException; 29 30 /** 31 * Unit tests for {@link Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)}. 32 * 33 * @author Nishant Thakkar 34 */ 35 public class FuturesTransformAsyncTest extends AbstractChainedListenableFutureTest<String> { 36 protected static final int SLOW_OUTPUT_VALID_INPUT_DATA = 2; 37 protected static final int SLOW_FUNC_VALID_INPUT_DATA = 3; 38 private static final String RESULT_DATA = "SUCCESS"; 39 40 private SettableFuture<String> outputFuture; 41 // Signals that the function is waiting to complete 42 private CountDownLatch funcIsWaitingLatch; 43 // Signals the function so it will complete 44 private CountDownLatch funcCompletionLatch; 45 46 @Override buildChainingFuture(ListenableFuture<Integer> inputFuture)47 protected ListenableFuture<String> buildChainingFuture(ListenableFuture<Integer> inputFuture) { 48 outputFuture = SettableFuture.create(); 49 funcIsWaitingLatch = new CountDownLatch(1); 50 funcCompletionLatch = new CountDownLatch(1); 51 return transformAsync(inputFuture, new ChainingFunction(), directExecutor()); 52 } 53 54 @Override getSuccessfulResult()55 protected String getSuccessfulResult() { 56 return RESULT_DATA; 57 } 58 59 private class ChainingFunction implements AsyncFunction<Integer, String> { 60 @Override apply(Integer input)61 public ListenableFuture<String> apply(Integer input) throws Exception { 62 switch (input) { 63 case VALID_INPUT_DATA: 64 outputFuture.set(RESULT_DATA); 65 break; 66 case SLOW_OUTPUT_VALID_INPUT_DATA: 67 break; // do nothing to the result 68 case SLOW_FUNC_VALID_INPUT_DATA: 69 funcIsWaitingLatch.countDown(); 70 awaitUninterruptibly(funcCompletionLatch); 71 break; 72 case EXCEPTION_DATA: 73 throw EXCEPTION; 74 } 75 return outputFuture; 76 } 77 } 78 testFutureGetThrowsFunctionException()79 public void testFutureGetThrowsFunctionException() throws Exception { 80 inputFuture.set(EXCEPTION_DATA); 81 listener.assertException(EXCEPTION); 82 } 83 testFutureGetThrowsCancellationIfInputCancelled()84 public void testFutureGetThrowsCancellationIfInputCancelled() throws Exception { 85 inputFuture.cancel(true); // argument is ignored 86 assertThrows(CancellationException.class, () -> resultFuture.get()); 87 } 88 testFutureGetThrowsCancellationIfOutputCancelled()89 public void testFutureGetThrowsCancellationIfOutputCancelled() throws Exception { 90 inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA); 91 outputFuture.cancel(true); // argument is ignored 92 assertThrows(CancellationException.class, () -> resultFuture.get()); 93 } 94 testAsyncToString()95 public void testAsyncToString() throws Exception { 96 inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA); 97 assertThat(resultFuture.toString()).contains(outputFuture.toString()); 98 } 99 testFutureCancelBeforeInputCompletion()100 public void testFutureCancelBeforeInputCompletion() throws Exception { 101 assertTrue(resultFuture.cancel(true)); 102 assertTrue(resultFuture.isCancelled()); 103 assertTrue(inputFuture.isCancelled()); 104 assertFalse(outputFuture.isCancelled()); 105 assertThrows(CancellationException.class, () -> resultFuture.get()); 106 } 107 testFutureCancellableBeforeOutputCompletion()108 public void testFutureCancellableBeforeOutputCompletion() throws Exception { 109 inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA); 110 assertTrue(resultFuture.cancel(true)); 111 assertTrue(resultFuture.isCancelled()); 112 assertFalse(inputFuture.isCancelled()); 113 assertTrue(outputFuture.isCancelled()); 114 assertThrows(CancellationException.class, () -> resultFuture.get()); 115 } 116 testFutureCancellableBeforeFunctionCompletion()117 public void testFutureCancellableBeforeFunctionCompletion() throws Exception { 118 // Set the result in a separate thread since this test runs the function 119 // (which will block) in the same thread. 120 new Thread() { 121 @Override 122 public void run() { 123 inputFuture.set(SLOW_FUNC_VALID_INPUT_DATA); 124 } 125 }.start(); 126 funcIsWaitingLatch.await(); 127 128 assertTrue(resultFuture.cancel(true)); 129 assertTrue(resultFuture.isCancelled()); 130 assertFalse(inputFuture.isCancelled()); 131 assertFalse(outputFuture.isCancelled()); 132 assertThrows(CancellationException.class, () -> resultFuture.get()); 133 134 funcCompletionLatch.countDown(); // allow the function to complete 135 assertThrows(CancellationException.class, () -> outputFuture.get()); 136 } 137 testFutureCancelAfterCompletion()138 public void testFutureCancelAfterCompletion() throws Exception { 139 inputFuture.set(VALID_INPUT_DATA); 140 assertFalse(resultFuture.cancel(true)); 141 assertFalse(resultFuture.isCancelled()); 142 assertFalse(inputFuture.isCancelled()); 143 assertFalse(outputFuture.isCancelled()); 144 assertEquals(RESULT_DATA, resultFuture.get()); 145 } 146 testFutureGetThrowsRuntimeException()147 public void testFutureGetThrowsRuntimeException() throws Exception { 148 BadFuture badInput = new BadFuture(Futures.immediateFuture(20)); 149 ListenableFuture<String> chain = buildChainingFuture(badInput); 150 ExecutionException e = assertThrows(ExecutionException.class, () -> chain.get()); 151 assertSame(RuntimeException.class, e.getCause().getClass()); 152 } 153 154 /** Proxy to throw a {@link RuntimeException} out of the {@link #get()} method. */ 155 public static class BadFuture extends SimpleForwardingListenableFuture<Integer> { BadFuture(ListenableFuture<Integer> delegate)156 protected BadFuture(ListenableFuture<Integer> delegate) { 157 super(delegate); 158 } 159 160 @Override get()161 public Integer get() { 162 throw new RuntimeException("Oops"); 163 } 164 } 165 } 166