• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2007 Google Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import java.util.concurrent.CancellationException;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Future;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
25 
26 /**
27  * <p>An abstract implementation of the {@link Future} interface.  This class
28  * is an abstraction of {@link java.util.concurrent.FutureTask} to support use
29  * for tasks other than {@link Runnable}s.  It uses an
30  * {@link AbstractQueuedSynchronizer} to deal with concurrency issues and
31  * guarantee thread safety.  It could be used as a base class to
32  * {@code FutureTask}, or any other implementor of the {@code Future} interface.
33  *
34  * <p>This class implements all methods in {@code Future}.  Subclasses should
35  * provide a way to set the result of the computation through the protected
36  * methods {@link #set(Object)}, {@link #setException(Throwable)}, or
37  * {@link #cancel()}.  If subclasses want to implement cancellation they can
38  * override the {@link #cancel(boolean)} method with a real implementation, the
39  * default implementation doesn't support cancellation.
40  *
41  * <p>The state changing methods all return a boolean indicating success or
42  * failure in changing the future's state.  Valid states are running,
43  * completed, failed, or cancelled.  Because this class does not implement
44  * cancellation it is left to the subclass to distinguish between created
45  * and running tasks.
46  *
47  * @author Sven Mawson
48  * @since 2009.09.15 <b>tentative</b>
49  */
50 public abstract class AbstractFuture<V> implements Future<V> {
51 
52   /** Synchronization control for AbstractFutures. */
53   private final Sync<V> sync = new Sync<V>();
54 
55   /*
56    * Blocks until either the task completes or the timeout expires.  Uses the
57    * sync blocking-with-timeout support provided by AQS.
58    */
get(long timeout, TimeUnit unit)59   public V get(long timeout, TimeUnit unit) throws InterruptedException,
60       TimeoutException, ExecutionException {
61     return sync.get(unit.toNanos(timeout));
62   }
63 
64   /*
65    * Blocks until the task completes or we get interrupted. Uses the
66    * interruptible blocking support provided by AQS.
67    */
get()68   public V get() throws InterruptedException, ExecutionException {
69     return sync.get();
70   }
71 
72   /*
73    * Checks if the sync is not in the running state.
74    */
isDone()75   public boolean isDone() {
76     return sync.isDone();
77   }
78 
79   /*
80    * Checks if the sync is in the cancelled state.
81    */
isCancelled()82   public boolean isCancelled() {
83     return sync.isCancelled();
84   }
85 
86   /*
87    * Default implementation of cancel that never cancels the future.
88    * Subclasses should override this to implement cancellation if desired.
89    */
cancel(boolean mayInterruptIfRunning)90   public boolean cancel(boolean mayInterruptIfRunning) {
91     return false;
92   }
93 
94   /**
95    * Subclasses should invoke this method to set the result of the computation
96    * to {@code value}.  This will set the state of the future to
97    * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
98    * state was successfully changed.
99    *
100    * @param value the value that was the result of the task.
101    * @return true if the state was successfully changed.
102    */
set(V value)103   protected boolean set(V value) {
104     boolean result = sync.set(value);
105     if (result) {
106       done();
107     }
108     return result;
109   }
110 
111   /**
112    * Subclasses should invoke this method to set the result of the computation
113    * to an error, {@code throwable}.  This will set the state of the future to
114    * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
115    * state was successfully changed.
116    *
117    * @param throwable the exception that the task failed with.
118    * @return true if the state was successfully changed.
119    * @throws Error if the throwable was an {@link Error}.
120    */
setException(Throwable throwable)121   protected boolean setException(Throwable throwable) {
122     boolean result = sync.setException(throwable);
123     if (result) {
124       done();
125     }
126 
127     // If it's an Error, we want to make sure it reaches the top of the
128     // call stack, so we rethrow it.
129     if (throwable instanceof Error) {
130       throw (Error) throwable;
131     }
132     return result;
133   }
134 
135   /**
136    * Subclasses should invoke this method to mark the future as cancelled.
137    * This will set the state of the future to {@link
138    * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was
139    * successfully changed.
140    *
141    * @return true if the state was successfully changed.
142    */
cancel()143   protected final boolean cancel() {
144     boolean result = sync.cancel();
145     if (result) {
146       done();
147     }
148     return result;
149   }
150 
151   /*
152    * Called by the success, failed, or cancelled methods to indicate that the
153    * value is now available and the latch can be released.  Subclasses can
154    * use this method to deal with any actions that should be undertaken when
155    * the task has completed.
156    */
done()157   protected void done() {
158     // Default implementation does nothing.
159   }
160 
161   /**
162    * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
163    * private subclass to hold the synchronizer.  This synchronizer is used to
164    * implement the blocking and waiting calls as well as to handle state changes
165    * in a thread-safe manner.  The current state of the future is held in the
166    * Sync state, and the lock is released whenever the state changes to either
167    * {@link #COMPLETED} or {@link #CANCELLED}.
168    *
169    * <p>To avoid races between threads doing release and acquire, we transition
170    * to the final state in two steps.  One thread will successfully CAS from
171    * RUNNING to COMPLETING, that thread will then set the result of the
172    * computation, and only then transition to COMPLETED or CANCELLED.
173    *
174    * <p>We don't use the integer argument passed between acquire methods so we
175    * pass around a -1 everywhere.
176    */
177   static final class Sync<V> extends AbstractQueuedSynchronizer {
178 
179     private static final long serialVersionUID = 0L;
180 
181     /* Valid states. */
182     static final int RUNNING = 0;
183     static final int COMPLETING = 1;
184     static final int COMPLETED = 2;
185     static final int CANCELLED = 4;
186 
187     private V value;
188     private ExecutionException exception;
189 
190     /*
191      * Acquisition succeeds if the future is done, otherwise it fails.
192      */
193     @Override
tryAcquireShared(int ignored)194     protected int tryAcquireShared(int ignored) {
195       if (isDone()) {
196         return 1;
197       }
198       return -1;
199     }
200 
201     /*
202      * We always allow a release to go through, this means the state has been
203      * successfully changed and the result is available.
204      */
205     @Override
tryReleaseShared(int finalState)206     protected boolean tryReleaseShared(int finalState) {
207       setState(finalState);
208       return true;
209     }
210 
211     /**
212      * Blocks until the task is complete or the timeout expires.  Throws a
213      * {@link TimeoutException} if the timer expires, otherwise behaves like
214      * {@link #get()}.
215      */
get(long nanos)216     V get(long nanos) throws TimeoutException, CancellationException,
217         ExecutionException, InterruptedException {
218 
219       // Attempt to acquire the shared lock with a timeout.
220       if (!tryAcquireSharedNanos(-1, nanos)) {
221         throw new TimeoutException("Timeout waiting for task.");
222       }
223 
224       return getValue();
225     }
226 
227     /**
228      * Blocks until {@link #complete(Object, Throwable, int)} has been
229      * successfully called.  Throws a {@link CancellationException} if the task
230      * was cancelled, or a {@link ExecutionException} if the task completed with
231      * an error.
232      */
get()233     V get() throws CancellationException, ExecutionException,
234         InterruptedException {
235 
236       // Acquire the shared lock allowing interruption.
237       acquireSharedInterruptibly(-1);
238       return getValue();
239     }
240 
241     /**
242      * Implementation of the actual value retrieval.  Will return the value
243      * on success, an exception on failure, a cancellation on cancellation, or
244      * an illegal state if the synchronizer is in an invalid state.
245      */
getValue()246     private V getValue() throws CancellationException, ExecutionException {
247       int state = getState();
248       switch (state) {
249         case COMPLETED:
250           if (exception != null) {
251             throw exception;
252           } else {
253             return value;
254           }
255 
256         case CANCELLED:
257           throw new CancellationException("Task was cancelled.");
258 
259         default:
260           throw new IllegalStateException(
261               "Error, synchronizer in invalid state: " + state);
262       }
263     }
264 
265     /**
266      * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
267      */
isDone()268     boolean isDone() {
269       return (getState() & (COMPLETED | CANCELLED)) != 0;
270     }
271 
272     /**
273      * Checks if the state is {@link #CANCELLED}.
274      */
isCancelled()275     boolean isCancelled() {
276       return getState() == CANCELLED;
277     }
278 
279     /**
280      * Transition to the COMPLETED state and set the value.
281      */
set(V v)282     boolean set(V v) {
283       return complete(v, null, COMPLETED);
284     }
285 
286     /**
287      * Transition to the COMPLETED state and set the exception.
288      */
setException(Throwable t)289     boolean setException(Throwable t) {
290       return complete(null, t, COMPLETED);
291     }
292 
293     /**
294      * Transition to the CANCELLED state.
295      */
cancel()296     boolean cancel() {
297       return complete(null, null, CANCELLED);
298     }
299 
300     /**
301      * Implementation of completing a task.  Either {@code v} or {@code t} will
302      * be set but not both.  The {@code finalState} is the state to change to
303      * from {@link #RUNNING}.  If the state is not in the RUNNING state we
304      * return {@code false}.
305      *
306      * @param v the value to set as the result of the computation.
307      * @param t the exception to set as the result of the computation.
308      * @param finalState the state to transition to.
309      */
complete(V v, Throwable t, int finalState)310     private boolean complete(V v, Throwable t, int finalState) {
311       if (compareAndSetState(RUNNING, COMPLETING)) {
312         this.value = v;
313         this.exception = t == null ? null : new ExecutionException(t);
314         releaseShared(finalState);
315         return true;
316       }
317 
318       // The state was not RUNNING, so there are no valid transitions.
319       return false;
320     }
321   }
322 }
323