1 /* 2 * Copyright (C) 2007 Google Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import java.util.concurrent.CancellationException; 20 import java.util.concurrent.ExecutionException; 21 import java.util.concurrent.Future; 22 import java.util.concurrent.TimeUnit; 23 import java.util.concurrent.TimeoutException; 24 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 25 26 /** 27 * <p>An abstract implementation of the {@link Future} interface. This class 28 * is an abstraction of {@link java.util.concurrent.FutureTask} to support use 29 * for tasks other than {@link Runnable}s. It uses an 30 * {@link AbstractQueuedSynchronizer} to deal with concurrency issues and 31 * guarantee thread safety. It could be used as a base class to 32 * {@code FutureTask}, or any other implementor of the {@code Future} interface. 33 * 34 * <p>This class implements all methods in {@code Future}. Subclasses should 35 * provide a way to set the result of the computation through the protected 36 * methods {@link #set(Object)}, {@link #setException(Throwable)}, or 37 * {@link #cancel()}. If subclasses want to implement cancellation they can 38 * override the {@link #cancel(boolean)} method with a real implementation, the 39 * default implementation doesn't support cancellation. 40 * 41 * <p>The state changing methods all return a boolean indicating success or 42 * failure in changing the future's state. Valid states are running, 43 * completed, failed, or cancelled. Because this class does not implement 44 * cancellation it is left to the subclass to distinguish between created 45 * and running tasks. 46 * 47 * @author Sven Mawson 48 * @since 2009.09.15 <b>tentative</b> 49 */ 50 public abstract class AbstractFuture<V> implements Future<V> { 51 52 /** Synchronization control for AbstractFutures. */ 53 private final Sync<V> sync = new Sync<V>(); 54 55 /* 56 * Blocks until either the task completes or the timeout expires. Uses the 57 * sync blocking-with-timeout support provided by AQS. 58 */ get(long timeout, TimeUnit unit)59 public V get(long timeout, TimeUnit unit) throws InterruptedException, 60 TimeoutException, ExecutionException { 61 return sync.get(unit.toNanos(timeout)); 62 } 63 64 /* 65 * Blocks until the task completes or we get interrupted. Uses the 66 * interruptible blocking support provided by AQS. 67 */ get()68 public V get() throws InterruptedException, ExecutionException { 69 return sync.get(); 70 } 71 72 /* 73 * Checks if the sync is not in the running state. 74 */ isDone()75 public boolean isDone() { 76 return sync.isDone(); 77 } 78 79 /* 80 * Checks if the sync is in the cancelled state. 81 */ isCancelled()82 public boolean isCancelled() { 83 return sync.isCancelled(); 84 } 85 86 /* 87 * Default implementation of cancel that never cancels the future. 88 * Subclasses should override this to implement cancellation if desired. 89 */ cancel(boolean mayInterruptIfRunning)90 public boolean cancel(boolean mayInterruptIfRunning) { 91 return false; 92 } 93 94 /** 95 * Subclasses should invoke this method to set the result of the computation 96 * to {@code value}. This will set the state of the future to 97 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the 98 * state was successfully changed. 99 * 100 * @param value the value that was the result of the task. 101 * @return true if the state was successfully changed. 102 */ set(V value)103 protected boolean set(V value) { 104 boolean result = sync.set(value); 105 if (result) { 106 done(); 107 } 108 return result; 109 } 110 111 /** 112 * Subclasses should invoke this method to set the result of the computation 113 * to an error, {@code throwable}. This will set the state of the future to 114 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the 115 * state was successfully changed. 116 * 117 * @param throwable the exception that the task failed with. 118 * @return true if the state was successfully changed. 119 * @throws Error if the throwable was an {@link Error}. 120 */ setException(Throwable throwable)121 protected boolean setException(Throwable throwable) { 122 boolean result = sync.setException(throwable); 123 if (result) { 124 done(); 125 } 126 127 // If it's an Error, we want to make sure it reaches the top of the 128 // call stack, so we rethrow it. 129 if (throwable instanceof Error) { 130 throw (Error) throwable; 131 } 132 return result; 133 } 134 135 /** 136 * Subclasses should invoke this method to mark the future as cancelled. 137 * This will set the state of the future to {@link 138 * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was 139 * successfully changed. 140 * 141 * @return true if the state was successfully changed. 142 */ cancel()143 protected final boolean cancel() { 144 boolean result = sync.cancel(); 145 if (result) { 146 done(); 147 } 148 return result; 149 } 150 151 /* 152 * Called by the success, failed, or cancelled methods to indicate that the 153 * value is now available and the latch can be released. Subclasses can 154 * use this method to deal with any actions that should be undertaken when 155 * the task has completed. 156 */ done()157 protected void done() { 158 // Default implementation does nothing. 159 } 160 161 /** 162 * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a 163 * private subclass to hold the synchronizer. This synchronizer is used to 164 * implement the blocking and waiting calls as well as to handle state changes 165 * in a thread-safe manner. The current state of the future is held in the 166 * Sync state, and the lock is released whenever the state changes to either 167 * {@link #COMPLETED} or {@link #CANCELLED}. 168 * 169 * <p>To avoid races between threads doing release and acquire, we transition 170 * to the final state in two steps. One thread will successfully CAS from 171 * RUNNING to COMPLETING, that thread will then set the result of the 172 * computation, and only then transition to COMPLETED or CANCELLED. 173 * 174 * <p>We don't use the integer argument passed between acquire methods so we 175 * pass around a -1 everywhere. 176 */ 177 static final class Sync<V> extends AbstractQueuedSynchronizer { 178 179 private static final long serialVersionUID = 0L; 180 181 /* Valid states. */ 182 static final int RUNNING = 0; 183 static final int COMPLETING = 1; 184 static final int COMPLETED = 2; 185 static final int CANCELLED = 4; 186 187 private V value; 188 private ExecutionException exception; 189 190 /* 191 * Acquisition succeeds if the future is done, otherwise it fails. 192 */ 193 @Override tryAcquireShared(int ignored)194 protected int tryAcquireShared(int ignored) { 195 if (isDone()) { 196 return 1; 197 } 198 return -1; 199 } 200 201 /* 202 * We always allow a release to go through, this means the state has been 203 * successfully changed and the result is available. 204 */ 205 @Override tryReleaseShared(int finalState)206 protected boolean tryReleaseShared(int finalState) { 207 setState(finalState); 208 return true; 209 } 210 211 /** 212 * Blocks until the task is complete or the timeout expires. Throws a 213 * {@link TimeoutException} if the timer expires, otherwise behaves like 214 * {@link #get()}. 215 */ get(long nanos)216 V get(long nanos) throws TimeoutException, CancellationException, 217 ExecutionException, InterruptedException { 218 219 // Attempt to acquire the shared lock with a timeout. 220 if (!tryAcquireSharedNanos(-1, nanos)) { 221 throw new TimeoutException("Timeout waiting for task."); 222 } 223 224 return getValue(); 225 } 226 227 /** 228 * Blocks until {@link #complete(Object, Throwable, int)} has been 229 * successfully called. Throws a {@link CancellationException} if the task 230 * was cancelled, or a {@link ExecutionException} if the task completed with 231 * an error. 232 */ get()233 V get() throws CancellationException, ExecutionException, 234 InterruptedException { 235 236 // Acquire the shared lock allowing interruption. 237 acquireSharedInterruptibly(-1); 238 return getValue(); 239 } 240 241 /** 242 * Implementation of the actual value retrieval. Will return the value 243 * on success, an exception on failure, a cancellation on cancellation, or 244 * an illegal state if the synchronizer is in an invalid state. 245 */ getValue()246 private V getValue() throws CancellationException, ExecutionException { 247 int state = getState(); 248 switch (state) { 249 case COMPLETED: 250 if (exception != null) { 251 throw exception; 252 } else { 253 return value; 254 } 255 256 case CANCELLED: 257 throw new CancellationException("Task was cancelled."); 258 259 default: 260 throw new IllegalStateException( 261 "Error, synchronizer in invalid state: " + state); 262 } 263 } 264 265 /** 266 * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}. 267 */ isDone()268 boolean isDone() { 269 return (getState() & (COMPLETED | CANCELLED)) != 0; 270 } 271 272 /** 273 * Checks if the state is {@link #CANCELLED}. 274 */ isCancelled()275 boolean isCancelled() { 276 return getState() == CANCELLED; 277 } 278 279 /** 280 * Transition to the COMPLETED state and set the value. 281 */ set(V v)282 boolean set(V v) { 283 return complete(v, null, COMPLETED); 284 } 285 286 /** 287 * Transition to the COMPLETED state and set the exception. 288 */ setException(Throwable t)289 boolean setException(Throwable t) { 290 return complete(null, t, COMPLETED); 291 } 292 293 /** 294 * Transition to the CANCELLED state. 295 */ cancel()296 boolean cancel() { 297 return complete(null, null, CANCELLED); 298 } 299 300 /** 301 * Implementation of completing a task. Either {@code v} or {@code t} will 302 * be set but not both. The {@code finalState} is the state to change to 303 * from {@link #RUNNING}. If the state is not in the RUNNING state we 304 * return {@code false}. 305 * 306 * @param v the value to set as the result of the computation. 307 * @param t the exception to set as the result of the computation. 308 * @param finalState the state to transition to. 309 */ complete(V v, Throwable t, int finalState)310 private boolean complete(V v, Throwable t, int finalState) { 311 if (compareAndSetState(RUNNING, COMPLETING)) { 312 this.value = v; 313 this.exception = t == null ? null : new ExecutionException(t); 314 releaseShared(finalState); 315 return true; 316 } 317 318 // The state was not RUNNING, so there are no valid transitions. 319 return false; 320 } 321 } 322 } 323