1 /* 2 * Copyright (C) 2007 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 21 import java.util.concurrent.CancellationException; 22 import java.util.concurrent.ExecutionException; 23 import java.util.concurrent.Executor; 24 import java.util.concurrent.TimeUnit; 25 import java.util.concurrent.TimeoutException; 26 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 27 28 import javax.annotation.Nullable; 29 30 /** 31 * An abstract implementation of the {@link ListenableFuture} interface. This 32 * class is preferable to {@link java.util.concurrent.FutureTask} for two 33 * reasons: It implements {@code ListenableFuture}, and it does not implement 34 * {@code Runnable}. (If you want a {@code Runnable} implementation of {@code 35 * ListenableFuture}, create a {@link ListenableFutureTask}, or submit your 36 * tasks to a {@link ListeningExecutorService}.) 37 * 38 * <p>This class implements all methods in {@code ListenableFuture}. 39 * Subclasses should provide a way to set the result of the computation through 40 * the protected methods {@link #set(Object)} and 41 * {@link #setException(Throwable)}. Subclasses may also override {@link 42 * #interruptTask()}, which will be invoked automatically if a call to {@link 43 * #cancel(boolean) cancel(true)} succeeds in canceling the future. 44 * 45 * <p>{@code AbstractFuture} uses an {@link AbstractQueuedSynchronizer} to deal 46 * with concurrency issues and guarantee thread safety. 47 * 48 * <p>The state changing methods all return a boolean indicating success or 49 * failure in changing the future's state. Valid states are running, 50 * completed, failed, or cancelled. 51 * 52 * <p>This class uses an {@link ExecutionList} to guarantee that all registered 53 * listeners will be executed, either when the future finishes or, for listeners 54 * that are added after the future completes, immediately. 55 * {@code Runnable}-{@code Executor} pairs are stored in the execution list but 56 * are not necessarily executed in the order in which they were added. (If a 57 * listener is added after the Future is complete, it will be executed 58 * immediately, even if earlier listeners have not been executed. Additionally, 59 * executors need not guarantee FIFO execution, or different listeners may run 60 * in different executors.) 61 * 62 * @author Sven Mawson 63 * @since 1.0 64 */ 65 public abstract class AbstractFuture<V> implements ListenableFuture<V> { 66 67 /** Synchronization control for AbstractFutures. */ 68 private final Sync<V> sync = new Sync<V>(); 69 70 // The execution list to hold our executors. 71 private final ExecutionList executionList = new ExecutionList(); 72 73 /** 74 * Constructor for use by subclasses. 75 */ AbstractFuture()76 protected AbstractFuture() {} 77 78 /* 79 * Improve the documentation of when InterruptedException is thrown. Our 80 * behavior matches the JDK's, but the JDK's documentation is misleading. 81 */ 82 /** 83 * {@inheritDoc} 84 * 85 * <p>The default {@link AbstractFuture} implementation throws {@code 86 * InterruptedException} if the current thread is interrupted before or during 87 * the call, even if the value is already available. 88 * 89 * @throws InterruptedException if the current thread was interrupted before 90 * or during the call (optional but recommended). 91 * @throws CancellationException {@inheritDoc} 92 */ 93 @Override get(long timeout, TimeUnit unit)94 public V get(long timeout, TimeUnit unit) throws InterruptedException, 95 TimeoutException, ExecutionException { 96 return sync.get(unit.toNanos(timeout)); 97 } 98 99 /* 100 * Improve the documentation of when InterruptedException is thrown. Our 101 * behavior matches the JDK's, but the JDK's documentation is misleading. 102 */ 103 /** 104 * {@inheritDoc} 105 * 106 * <p>The default {@link AbstractFuture} implementation throws {@code 107 * InterruptedException} if the current thread is interrupted before or during 108 * the call, even if the value is already available. 109 * 110 * @throws InterruptedException if the current thread was interrupted before 111 * or during the call (optional but recommended). 112 * @throws CancellationException {@inheritDoc} 113 */ 114 @Override get()115 public V get() throws InterruptedException, ExecutionException { 116 return sync.get(); 117 } 118 119 @Override isDone()120 public boolean isDone() { 121 return sync.isDone(); 122 } 123 124 @Override isCancelled()125 public boolean isCancelled() { 126 return sync.isCancelled(); 127 } 128 129 @Override cancel(boolean mayInterruptIfRunning)130 public boolean cancel(boolean mayInterruptIfRunning) { 131 if (!sync.cancel(mayInterruptIfRunning)) { 132 return false; 133 } 134 executionList.execute(); 135 if (mayInterruptIfRunning) { 136 interruptTask(); 137 } 138 return true; 139 } 140 141 /** 142 * Subclasses can override this method to implement interruption of the 143 * future's computation. The method is invoked automatically by a successful 144 * call to {@link #cancel(boolean) cancel(true)}. 145 * 146 * <p>The default implementation does nothing. 147 * 148 * @since 10.0 149 */ interruptTask()150 protected void interruptTask() { 151 } 152 153 /** 154 * Returns true if this future was cancelled with {@code 155 * mayInterruptIfRunning} set to {@code true}. 156 * 157 * @since 14.0 158 */ wasInterrupted()159 protected final boolean wasInterrupted() { 160 return sync.wasInterrupted(); 161 } 162 163 /** 164 * {@inheritDoc} 165 * 166 * @since 10.0 167 */ 168 @Override addListener(Runnable listener, Executor exec)169 public void addListener(Runnable listener, Executor exec) { 170 executionList.add(listener, exec); 171 } 172 173 /** 174 * Subclasses should invoke this method to set the result of the computation 175 * to {@code value}. This will set the state of the future to 176 * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the 177 * state was successfully changed. 178 * 179 * @param value the value that was the result of the task. 180 * @return true if the state was successfully changed. 181 */ set(@ullable V value)182 protected boolean set(@Nullable V value) { 183 boolean result = sync.set(value); 184 if (result) { 185 executionList.execute(); 186 } 187 return result; 188 } 189 190 /** 191 * Subclasses should invoke this method to set the result of the computation 192 * to an error, {@code throwable}. This will set the state of the future to 193 * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the 194 * state was successfully changed. 195 * 196 * @param throwable the exception that the task failed with. 197 * @return true if the state was successfully changed. 198 */ setException(Throwable throwable)199 protected boolean setException(Throwable throwable) { 200 boolean result = sync.setException(checkNotNull(throwable)); 201 if (result) { 202 executionList.execute(); 203 } 204 return result; 205 } 206 207 /** 208 * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a 209 * private subclass to hold the synchronizer. This synchronizer is used to 210 * implement the blocking and waiting calls as well as to handle state changes 211 * in a thread-safe manner. The current state of the future is held in the 212 * Sync state, and the lock is released whenever the state changes to 213 * {@link #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED} 214 * 215 * <p>To avoid races between threads doing release and acquire, we transition 216 * to the final state in two steps. One thread will successfully CAS from 217 * RUNNING to COMPLETING, that thread will then set the result of the 218 * computation, and only then transition to COMPLETED, CANCELLED, or 219 * INTERRUPTED. 220 * 221 * <p>We don't use the integer argument passed between acquire methods so we 222 * pass around a -1 everywhere. 223 */ 224 static final class Sync<V> extends AbstractQueuedSynchronizer { 225 226 private static final long serialVersionUID = 0L; 227 228 /* Valid states. */ 229 static final int RUNNING = 0; 230 static final int COMPLETING = 1; 231 static final int COMPLETED = 2; 232 static final int CANCELLED = 4; 233 static final int INTERRUPTED = 8; 234 235 private V value; 236 private Throwable exception; 237 238 /* 239 * Acquisition succeeds if the future is done, otherwise it fails. 240 */ 241 @Override tryAcquireShared(int ignored)242 protected int tryAcquireShared(int ignored) { 243 if (isDone()) { 244 return 1; 245 } 246 return -1; 247 } 248 249 /* 250 * We always allow a release to go through, this means the state has been 251 * successfully changed and the result is available. 252 */ 253 @Override tryReleaseShared(int finalState)254 protected boolean tryReleaseShared(int finalState) { 255 setState(finalState); 256 return true; 257 } 258 259 /** 260 * Blocks until the task is complete or the timeout expires. Throws a 261 * {@link TimeoutException} if the timer expires, otherwise behaves like 262 * {@link #get()}. 263 */ get(long nanos)264 V get(long nanos) throws TimeoutException, CancellationException, 265 ExecutionException, InterruptedException { 266 267 // Attempt to acquire the shared lock with a timeout. 268 if (!tryAcquireSharedNanos(-1, nanos)) { 269 throw new TimeoutException("Timeout waiting for task."); 270 } 271 272 return getValue(); 273 } 274 275 /** 276 * Blocks until {@link #complete(Object, Throwable, int)} has been 277 * successfully called. Throws a {@link CancellationException} if the task 278 * was cancelled, or a {@link ExecutionException} if the task completed with 279 * an error. 280 */ get()281 V get() throws CancellationException, ExecutionException, 282 InterruptedException { 283 284 // Acquire the shared lock allowing interruption. 285 acquireSharedInterruptibly(-1); 286 return getValue(); 287 } 288 289 /** 290 * Implementation of the actual value retrieval. Will return the value 291 * on success, an exception on failure, a cancellation on cancellation, or 292 * an illegal state if the synchronizer is in an invalid state. 293 */ getValue()294 private V getValue() throws CancellationException, ExecutionException { 295 int state = getState(); 296 switch (state) { 297 case COMPLETED: 298 if (exception != null) { 299 throw new ExecutionException(exception); 300 } else { 301 return value; 302 } 303 304 case CANCELLED: 305 case INTERRUPTED: 306 throw cancellationExceptionWithCause( 307 "Task was cancelled.", exception); 308 309 default: 310 throw new IllegalStateException( 311 "Error, synchronizer in invalid state: " + state); 312 } 313 } 314 315 /** 316 * Checks if the state is {@link #COMPLETED}, {@link #CANCELLED}, or {@link 317 * INTERRUPTED}. 318 */ isDone()319 boolean isDone() { 320 return (getState() & (COMPLETED | CANCELLED | INTERRUPTED)) != 0; 321 } 322 323 /** 324 * Checks if the state is {@link #CANCELLED} or {@link #INTERRUPTED}. 325 */ isCancelled()326 boolean isCancelled() { 327 return (getState() & (CANCELLED | INTERRUPTED)) != 0; 328 } 329 330 /** 331 * Checks if the state is {@link #INTERRUPTED}. 332 */ wasInterrupted()333 boolean wasInterrupted() { 334 return getState() == INTERRUPTED; 335 } 336 337 /** 338 * Transition to the COMPLETED state and set the value. 339 */ set(@ullable V v)340 boolean set(@Nullable V v) { 341 return complete(v, null, COMPLETED); 342 } 343 344 /** 345 * Transition to the COMPLETED state and set the exception. 346 */ setException(Throwable t)347 boolean setException(Throwable t) { 348 return complete(null, t, COMPLETED); 349 } 350 351 /** 352 * Transition to the CANCELLED or INTERRUPTED state. 353 */ cancel(boolean interrupt)354 boolean cancel(boolean interrupt) { 355 return complete(null, null, interrupt ? INTERRUPTED : CANCELLED); 356 } 357 358 /** 359 * Implementation of completing a task. Either {@code v} or {@code t} will 360 * be set but not both. The {@code finalState} is the state to change to 361 * from {@link #RUNNING}. If the state is not in the RUNNING state we 362 * return {@code false} after waiting for the state to be set to a valid 363 * final state ({@link #COMPLETED}, {@link #CANCELLED}, or {@link 364 * #INTERRUPTED}). 365 * 366 * @param v the value to set as the result of the computation. 367 * @param t the exception to set as the result of the computation. 368 * @param finalState the state to transition to. 369 */ complete(@ullable V v, @Nullable Throwable t, int finalState)370 private boolean complete(@Nullable V v, @Nullable Throwable t, 371 int finalState) { 372 boolean doCompletion = compareAndSetState(RUNNING, COMPLETING); 373 if (doCompletion) { 374 // If this thread successfully transitioned to COMPLETING, set the value 375 // and exception and then release to the final state. 376 this.value = v; 377 // Don't actually construct a CancellationException until necessary. 378 this.exception = ((finalState & (CANCELLED | INTERRUPTED)) != 0) 379 ? new CancellationException("Future.cancel() was called.") : t; 380 releaseShared(finalState); 381 } else if (getState() == COMPLETING) { 382 // If some other thread is currently completing the future, block until 383 // they are done so we can guarantee completion. 384 acquireShared(-1); 385 } 386 return doCompletion; 387 } 388 } 389 cancellationExceptionWithCause( @ullable String message, @Nullable Throwable cause)390 static final CancellationException cancellationExceptionWithCause( 391 @Nullable String message, @Nullable Throwable cause) { 392 CancellationException exception = new CancellationException(message); 393 exception.initCause(cause); 394 return exception; 395 } 396 } 397