1 /* 2 * Copyright (C) 2007 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 21 import java.util.concurrent.CancellationException; 22 import java.util.concurrent.ExecutionException; 23 import java.util.concurrent.Executor; 24 import java.util.concurrent.TimeUnit; 25 import java.util.concurrent.TimeoutException; 26 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 27 28 import javax.annotation.Nullable; 29 30 /** 31 * An abstract implementation of the {@link ListenableFuture} interface. This 32 * class is preferable to {@link java.util.concurrent.FutureTask} for two 33 * reasons: It implements {@code ListenableFuture}, and it does not implement 34 * {@code Runnable}. (If you want a {@code Runnable} implementation of {@code 35 * ListenableFuture}, create a {@link ListenableFutureTask}, or submit your 36 * tasks to a {@link ListeningExecutorService}.) 37 * 38 * <p>This class implements all methods in {@code ListenableFuture}. 39 * Subclasses should provide a way to set the result of the computation through 40 * the protected methods {@link #set(Object)} and 41 * {@link #setException(Throwable)}. Subclasses may also override {@link 42 * #interruptTask()}, which will be invoked automatically if a call to {@link 43 * #cancel(boolean) cancel(true)} succeeds in canceling the future. 44 * 45 * <p>{@code AbstractFuture} uses an {@link AbstractQueuedSynchronizer} to deal 46 * with concurrency issues and guarantee thread safety. 47 * 48 * <p>The state changing methods all return a boolean indicating success or 49 * failure in changing the future's state. Valid states are running, 50 * completed, failed, or cancelled. 51 * 52 * <p>This class uses an {@link ExecutionList} to guarantee that all registered 53 * listeners will be executed, either when the future finishes or, for listeners 54 * that are added after the future completes, immediately. 55 * {@code Runnable}-{@code Executor} pairs are stored in the execution list but 56 * are not necessarily executed in the order in which they were added. (If a 57 * listener is added after the Future is complete, it will be executed 58 * immediately, even if earlier listeners have not been executed. Additionally, 59 * executors need not guarantee FIFO execution, or different listeners may run 60 * in different executors.) 61 * 62 * @author Sven Mawson 63 * @since 1.0 64 */ 65 public abstract class AbstractFuture<V> implements ListenableFuture<V> { 66 67 /** Synchronization control for AbstractFutures. */ 68 private final Sync<V> sync = new Sync<V>(); 69 70 // The execution list to hold our executors. 71 private final ExecutionList executionList = new ExecutionList(); 72 73 /* 74 * Improve the documentation of when InterruptedException is thrown. Our 75 * behavior matches the JDK's, but the JDK's documentation is misleading. 76 */ 77 /** 78 * {@inheritDoc} 79 * 80 * <p>The default {@link AbstractFuture} implementation throws {@code 81 * InterruptedException} if the current thread is interrupted before or during 82 * the call, even if the value is already available. 83 * 84 * @throws InterruptedException if the current thread was interrupted before 85 * or during the call (optional but recommended). 86 * @throws CancellationException {@inheritDoc} 87 */ 88 @Override get(long timeout, TimeUnit unit)89 public V get(long timeout, TimeUnit unit) throws InterruptedException, 90 TimeoutException, ExecutionException { 91 return sync.get(unit.toNanos(timeout)); 92 } 93 94 /* 95 * Improve the documentation of when InterruptedException is thrown. Our 96 * behavior matches the JDK's, but the JDK's documentation is misleading. 97 */ 98 /** 99 * {@inheritDoc} 100 * 101 * <p>The default {@link AbstractFuture} implementation throws {@code 102 * InterruptedException} if the current thread is interrupted before or during 103 * the call, even if the value is already available. 104 * 105 * @throws InterruptedException if the current thread was interrupted before 106 * or during the call (optional but recommended). 107 * @throws CancellationException {@inheritDoc} 108 */ 109 @Override get()110 public V get() throws InterruptedException, ExecutionException { 111 return sync.get(); 112 } 113 114 @Override isDone()115 public boolean isDone() { 116 return sync.isDone(); 117 } 118 119 @Override isCancelled()120 public boolean isCancelled() { 121 return sync.isCancelled(); 122 } 123 124 @Override cancel(boolean mayInterruptIfRunning)125 public boolean cancel(boolean mayInterruptIfRunning) { 126 if (!sync.cancel()) { 127 return false; 128 } 129 executionList.execute(); 130 if (mayInterruptIfRunning) { 131 interruptTask(); 132 } 133 return true; 134 } 135 136 /** 137 * Subclasses can override this method to implement interruption of the 138 * future's computation. The method is invoked automatically by a successful 139 * call to {@link #cancel(boolean) cancel(true)}. 140 * 141 * <p>The default implementation does nothing. 142 * 143 * @since 10.0 144 */ interruptTask()145 protected void interruptTask() { 146 } 147 148 /** 149 * {@inheritDoc} 150 * 151 * @since 10.0 152 */ 153 @Override addListener(Runnable listener, Executor exec)154 public void addListener(Runnable listener, Executor exec) { 155 executionList.add(listener, exec); 156 } 157 158 /** 159 * Subclasses should invoke this method to set the result of the computation 160 * to {@code value}. This will set the state of the future to 161 * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the 162 * state was successfully changed. 163 * 164 * @param value the value that was the result of the task. 165 * @return true if the state was successfully changed. 166 */ set(@ullable V value)167 protected boolean set(@Nullable V value) { 168 boolean result = sync.set(value); 169 if (result) { 170 executionList.execute(); 171 } 172 return result; 173 } 174 175 /** 176 * Subclasses should invoke this method to set the result of the computation 177 * to an error, {@code throwable}. This will set the state of the future to 178 * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the 179 * state was successfully changed. 180 * 181 * @param throwable the exception that the task failed with. 182 * @return true if the state was successfully changed. 183 * @throws Error if the throwable was an {@link Error}. 184 */ setException(Throwable throwable)185 protected boolean setException(Throwable throwable) { 186 boolean result = sync.setException(checkNotNull(throwable)); 187 if (result) { 188 executionList.execute(); 189 } 190 191 // If it's an Error, we want to make sure it reaches the top of the 192 // call stack, so we rethrow it. 193 if (throwable instanceof Error) { 194 throw (Error) throwable; 195 } 196 return result; 197 } 198 199 /** 200 * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a 201 * private subclass to hold the synchronizer. This synchronizer is used to 202 * implement the blocking and waiting calls as well as to handle state changes 203 * in a thread-safe manner. The current state of the future is held in the 204 * Sync state, and the lock is released whenever the state changes to either 205 * {@link #COMPLETED} or {@link #CANCELLED}. 206 * 207 * <p>To avoid races between threads doing release and acquire, we transition 208 * to the final state in two steps. One thread will successfully CAS from 209 * RUNNING to COMPLETING, that thread will then set the result of the 210 * computation, and only then transition to COMPLETED or CANCELLED. 211 * 212 * <p>We don't use the integer argument passed between acquire methods so we 213 * pass around a -1 everywhere. 214 */ 215 static final class Sync<V> extends AbstractQueuedSynchronizer { 216 217 private static final long serialVersionUID = 0L; 218 219 /* Valid states. */ 220 static final int RUNNING = 0; 221 static final int COMPLETING = 1; 222 static final int COMPLETED = 2; 223 static final int CANCELLED = 4; 224 225 private V value; 226 private Throwable exception; 227 228 /* 229 * Acquisition succeeds if the future is done, otherwise it fails. 230 */ 231 @Override tryAcquireShared(int ignored)232 protected int tryAcquireShared(int ignored) { 233 if (isDone()) { 234 return 1; 235 } 236 return -1; 237 } 238 239 /* 240 * We always allow a release to go through, this means the state has been 241 * successfully changed and the result is available. 242 */ 243 @Override tryReleaseShared(int finalState)244 protected boolean tryReleaseShared(int finalState) { 245 setState(finalState); 246 return true; 247 } 248 249 /** 250 * Blocks until the task is complete or the timeout expires. Throws a 251 * {@link TimeoutException} if the timer expires, otherwise behaves like 252 * {@link #get()}. 253 */ get(long nanos)254 V get(long nanos) throws TimeoutException, CancellationException, 255 ExecutionException, InterruptedException { 256 257 // Attempt to acquire the shared lock with a timeout. 258 if (!tryAcquireSharedNanos(-1, nanos)) { 259 throw new TimeoutException("Timeout waiting for task."); 260 } 261 262 return getValue(); 263 } 264 265 /** 266 * Blocks until {@link #complete(Object, Throwable, int)} has been 267 * successfully called. Throws a {@link CancellationException} if the task 268 * was cancelled, or a {@link ExecutionException} if the task completed with 269 * an error. 270 */ get()271 V get() throws CancellationException, ExecutionException, 272 InterruptedException { 273 274 // Acquire the shared lock allowing interruption. 275 acquireSharedInterruptibly(-1); 276 return getValue(); 277 } 278 279 /** 280 * Implementation of the actual value retrieval. Will return the value 281 * on success, an exception on failure, a cancellation on cancellation, or 282 * an illegal state if the synchronizer is in an invalid state. 283 */ getValue()284 private V getValue() throws CancellationException, ExecutionException { 285 int state = getState(); 286 switch (state) { 287 case COMPLETED: 288 if (exception != null) { 289 throw new ExecutionException(exception); 290 } else { 291 return value; 292 } 293 294 case CANCELLED: 295 throw new CancellationException("Task was cancelled."); 296 297 default: 298 throw new IllegalStateException( 299 "Error, synchronizer in invalid state: " + state); 300 } 301 } 302 303 /** 304 * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}. 305 */ isDone()306 boolean isDone() { 307 return (getState() & (COMPLETED | CANCELLED)) != 0; 308 } 309 310 /** 311 * Checks if the state is {@link #CANCELLED}. 312 */ isCancelled()313 boolean isCancelled() { 314 return getState() == CANCELLED; 315 } 316 317 /** 318 * Transition to the COMPLETED state and set the value. 319 */ set(@ullable V v)320 boolean set(@Nullable V v) { 321 return complete(v, null, COMPLETED); 322 } 323 324 /** 325 * Transition to the COMPLETED state and set the exception. 326 */ setException(Throwable t)327 boolean setException(Throwable t) { 328 return complete(null, t, COMPLETED); 329 } 330 331 /** 332 * Transition to the CANCELLED state. 333 */ cancel()334 boolean cancel() { 335 return complete(null, null, CANCELLED); 336 } 337 338 /** 339 * Implementation of completing a task. Either {@code v} or {@code t} will 340 * be set but not both. The {@code finalState} is the state to change to 341 * from {@link #RUNNING}. If the state is not in the RUNNING state we 342 * return {@code false} after waiting for the state to be set to a valid 343 * final state ({@link #COMPLETED} or {@link #CANCELLED}). 344 * 345 * @param v the value to set as the result of the computation. 346 * @param t the exception to set as the result of the computation. 347 * @param finalState the state to transition to. 348 */ complete(@ullable V v, @Nullable Throwable t, int finalState)349 private boolean complete(@Nullable V v, @Nullable Throwable t, 350 int finalState) { 351 boolean doCompletion = compareAndSetState(RUNNING, COMPLETING); 352 if (doCompletion) { 353 // If this thread successfully transitioned to COMPLETING, set the value 354 // and exception and then release to the final state. 355 this.value = v; 356 this.exception = t; 357 releaseShared(finalState); 358 } else if (getState() == COMPLETING) { 359 // If some other thread is currently completing the future, block until 360 // they are done so we can guarantee completion. 361 acquireShared(-1); 362 } 363 return doCompletion; 364 } 365 } 366 } 367