• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2007 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
27 
28 import javax.annotation.Nullable;
29 
30 /**
31  * An abstract implementation of the {@link ListenableFuture} interface. This
32  * class is preferable to {@link java.util.concurrent.FutureTask} for two
33  * reasons: It implements {@code ListenableFuture}, and it does not implement
34  * {@code Runnable}. (If you want a {@code Runnable} implementation of {@code
35  * ListenableFuture}, create a {@link ListenableFutureTask}, or submit your
36  * tasks to a {@link ListeningExecutorService}.)
37  *
38  * <p>This class implements all methods in {@code ListenableFuture}.
39  * Subclasses should provide a way to set the result of the computation through
40  * the protected methods {@link #set(Object)} and
41  * {@link #setException(Throwable)}. Subclasses may also override {@link
42  * #interruptTask()}, which will be invoked automatically if a call to {@link
43  * #cancel(boolean) cancel(true)} succeeds in canceling the future.
44  *
45  * <p>{@code AbstractFuture} uses an {@link AbstractQueuedSynchronizer} to deal
46  * with concurrency issues and guarantee thread safety.
47  *
48  * <p>The state changing methods all return a boolean indicating success or
49  * failure in changing the future's state.  Valid states are running,
50  * completed, failed, or cancelled.
51  *
52  * <p>This class uses an {@link ExecutionList} to guarantee that all registered
53  * listeners will be executed, either when the future finishes or, for listeners
54  * that are added after the future completes, immediately.
55  * {@code Runnable}-{@code Executor} pairs are stored in the execution list but
56  * are not necessarily executed in the order in which they were added.  (If a
57  * listener is added after the Future is complete, it will be executed
58  * immediately, even if earlier listeners have not been executed. Additionally,
59  * executors need not guarantee FIFO execution, or different listeners may run
60  * in different executors.)
61  *
62  * @author Sven Mawson
63  * @since 1.0
64  */
65 public abstract class AbstractFuture<V> implements ListenableFuture<V> {
66 
67   /** Synchronization control for AbstractFutures. */
68   private final Sync<V> sync = new Sync<V>();
69 
70   // The execution list to hold our executors.
71   private final ExecutionList executionList = new ExecutionList();
72 
73   /*
74    * Improve the documentation of when InterruptedException is thrown. Our
75    * behavior matches the JDK's, but the JDK's documentation is misleading.
76    */
77   /**
78    * {@inheritDoc}
79    *
80    * <p>The default {@link AbstractFuture} implementation throws {@code
81    * InterruptedException} if the current thread is interrupted before or during
82    * the call, even if the value is already available.
83    *
84    * @throws InterruptedException if the current thread was interrupted before
85    *     or during the call (optional but recommended).
86    * @throws CancellationException {@inheritDoc}
87    */
88   @Override
get(long timeout, TimeUnit unit)89   public V get(long timeout, TimeUnit unit) throws InterruptedException,
90       TimeoutException, ExecutionException {
91     return sync.get(unit.toNanos(timeout));
92   }
93 
94   /*
95    * Improve the documentation of when InterruptedException is thrown. Our
96    * behavior matches the JDK's, but the JDK's documentation is misleading.
97    */
98   /**
99    * {@inheritDoc}
100    *
101    * <p>The default {@link AbstractFuture} implementation throws {@code
102    * InterruptedException} if the current thread is interrupted before or during
103    * the call, even if the value is already available.
104    *
105    * @throws InterruptedException if the current thread was interrupted before
106    *     or during the call (optional but recommended).
107    * @throws CancellationException {@inheritDoc}
108    */
109   @Override
get()110   public V get() throws InterruptedException, ExecutionException {
111     return sync.get();
112   }
113 
114   @Override
isDone()115   public boolean isDone() {
116     return sync.isDone();
117   }
118 
119   @Override
isCancelled()120   public boolean isCancelled() {
121     return sync.isCancelled();
122   }
123 
124   @Override
cancel(boolean mayInterruptIfRunning)125   public boolean cancel(boolean mayInterruptIfRunning) {
126     if (!sync.cancel()) {
127       return false;
128     }
129     executionList.execute();
130     if (mayInterruptIfRunning) {
131       interruptTask();
132     }
133     return true;
134   }
135 
136   /**
137    * Subclasses can override this method to implement interruption of the
138    * future's computation. The method is invoked automatically by a successful
139    * call to {@link #cancel(boolean) cancel(true)}.
140    *
141    * <p>The default implementation does nothing.
142    *
143    * @since 10.0
144    */
interruptTask()145   protected void interruptTask() {
146   }
147 
148   /**
149    * {@inheritDoc}
150    *
151    * @since 10.0
152    */
153   @Override
addListener(Runnable listener, Executor exec)154   public void addListener(Runnable listener, Executor exec) {
155     executionList.add(listener, exec);
156   }
157 
158   /**
159    * Subclasses should invoke this method to set the result of the computation
160    * to {@code value}.  This will set the state of the future to
161    * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the
162    * state was successfully changed.
163    *
164    * @param value the value that was the result of the task.
165    * @return true if the state was successfully changed.
166    */
set(@ullable V value)167   protected boolean set(@Nullable V value) {
168     boolean result = sync.set(value);
169     if (result) {
170       executionList.execute();
171     }
172     return result;
173   }
174 
175   /**
176    * Subclasses should invoke this method to set the result of the computation
177    * to an error, {@code throwable}.  This will set the state of the future to
178    * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the
179    * state was successfully changed.
180    *
181    * @param throwable the exception that the task failed with.
182    * @return true if the state was successfully changed.
183    * @throws Error if the throwable was an {@link Error}.
184    */
setException(Throwable throwable)185   protected boolean setException(Throwable throwable) {
186     boolean result = sync.setException(checkNotNull(throwable));
187     if (result) {
188       executionList.execute();
189     }
190 
191     // If it's an Error, we want to make sure it reaches the top of the
192     // call stack, so we rethrow it.
193     if (throwable instanceof Error) {
194       throw (Error) throwable;
195     }
196     return result;
197   }
198 
199   /**
200    * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
201    * private subclass to hold the synchronizer.  This synchronizer is used to
202    * implement the blocking and waiting calls as well as to handle state changes
203    * in a thread-safe manner.  The current state of the future is held in the
204    * Sync state, and the lock is released whenever the state changes to either
205    * {@link #COMPLETED} or {@link #CANCELLED}.
206    *
207    * <p>To avoid races between threads doing release and acquire, we transition
208    * to the final state in two steps.  One thread will successfully CAS from
209    * RUNNING to COMPLETING, that thread will then set the result of the
210    * computation, and only then transition to COMPLETED or CANCELLED.
211    *
212    * <p>We don't use the integer argument passed between acquire methods so we
213    * pass around a -1 everywhere.
214    */
215   static final class Sync<V> extends AbstractQueuedSynchronizer {
216 
217     private static final long serialVersionUID = 0L;
218 
219     /* Valid states. */
220     static final int RUNNING = 0;
221     static final int COMPLETING = 1;
222     static final int COMPLETED = 2;
223     static final int CANCELLED = 4;
224 
225     private V value;
226     private Throwable exception;
227 
228     /*
229      * Acquisition succeeds if the future is done, otherwise it fails.
230      */
231     @Override
tryAcquireShared(int ignored)232     protected int tryAcquireShared(int ignored) {
233       if (isDone()) {
234         return 1;
235       }
236       return -1;
237     }
238 
239     /*
240      * We always allow a release to go through, this means the state has been
241      * successfully changed and the result is available.
242      */
243     @Override
tryReleaseShared(int finalState)244     protected boolean tryReleaseShared(int finalState) {
245       setState(finalState);
246       return true;
247     }
248 
249     /**
250      * Blocks until the task is complete or the timeout expires.  Throws a
251      * {@link TimeoutException} if the timer expires, otherwise behaves like
252      * {@link #get()}.
253      */
get(long nanos)254     V get(long nanos) throws TimeoutException, CancellationException,
255         ExecutionException, InterruptedException {
256 
257       // Attempt to acquire the shared lock with a timeout.
258       if (!tryAcquireSharedNanos(-1, nanos)) {
259         throw new TimeoutException("Timeout waiting for task.");
260       }
261 
262       return getValue();
263     }
264 
265     /**
266      * Blocks until {@link #complete(Object, Throwable, int)} has been
267      * successfully called.  Throws a {@link CancellationException} if the task
268      * was cancelled, or a {@link ExecutionException} if the task completed with
269      * an error.
270      */
get()271     V get() throws CancellationException, ExecutionException,
272         InterruptedException {
273 
274       // Acquire the shared lock allowing interruption.
275       acquireSharedInterruptibly(-1);
276       return getValue();
277     }
278 
279     /**
280      * Implementation of the actual value retrieval.  Will return the value
281      * on success, an exception on failure, a cancellation on cancellation, or
282      * an illegal state if the synchronizer is in an invalid state.
283      */
getValue()284     private V getValue() throws CancellationException, ExecutionException {
285       int state = getState();
286       switch (state) {
287         case COMPLETED:
288           if (exception != null) {
289             throw new ExecutionException(exception);
290           } else {
291             return value;
292           }
293 
294         case CANCELLED:
295           throw new CancellationException("Task was cancelled.");
296 
297         default:
298           throw new IllegalStateException(
299               "Error, synchronizer in invalid state: " + state);
300       }
301     }
302 
303     /**
304      * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
305      */
isDone()306     boolean isDone() {
307       return (getState() & (COMPLETED | CANCELLED)) != 0;
308     }
309 
310     /**
311      * Checks if the state is {@link #CANCELLED}.
312      */
isCancelled()313     boolean isCancelled() {
314       return getState() == CANCELLED;
315     }
316 
317     /**
318      * Transition to the COMPLETED state and set the value.
319      */
set(@ullable V v)320     boolean set(@Nullable V v) {
321       return complete(v, null, COMPLETED);
322     }
323 
324     /**
325      * Transition to the COMPLETED state and set the exception.
326      */
setException(Throwable t)327     boolean setException(Throwable t) {
328       return complete(null, t, COMPLETED);
329     }
330 
331     /**
332      * Transition to the CANCELLED state.
333      */
cancel()334     boolean cancel() {
335       return complete(null, null, CANCELLED);
336     }
337 
338     /**
339      * Implementation of completing a task.  Either {@code v} or {@code t} will
340      * be set but not both.  The {@code finalState} is the state to change to
341      * from {@link #RUNNING}.  If the state is not in the RUNNING state we
342      * return {@code false} after waiting for the state to be set to a valid
343      * final state ({@link #COMPLETED} or {@link #CANCELLED}).
344      *
345      * @param v the value to set as the result of the computation.
346      * @param t the exception to set as the result of the computation.
347      * @param finalState the state to transition to.
348      */
complete(@ullable V v, @Nullable Throwable t, int finalState)349     private boolean complete(@Nullable V v, @Nullable Throwable t,
350         int finalState) {
351       boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
352       if (doCompletion) {
353         // If this thread successfully transitioned to COMPLETING, set the value
354         // and exception and then release to the final state.
355         this.value = v;
356         this.exception = t;
357         releaseShared(finalState);
358       } else if (getState() == COMPLETING) {
359         // If some other thread is currently completing the future, block until
360         // they are done so we can guarantee completion.
361         acquireShared(-1);
362       }
363       return doCompletion;
364     }
365   }
366 }
367