• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2007 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
27 
28 import javax.annotation.Nullable;
29 
30 /**
31  * An abstract implementation of the {@link ListenableFuture} interface. This
32  * class is preferable to {@link java.util.concurrent.FutureTask} for two
33  * reasons: It implements {@code ListenableFuture}, and it does not implement
34  * {@code Runnable}. (If you want a {@code Runnable} implementation of {@code
35  * ListenableFuture}, create a {@link ListenableFutureTask}, or submit your
36  * tasks to a {@link ListeningExecutorService}.)
37  *
38  * <p>This class implements all methods in {@code ListenableFuture}.
39  * Subclasses should provide a way to set the result of the computation through
40  * the protected methods {@link #set(Object)} and
41  * {@link #setException(Throwable)}. Subclasses may also override {@link
42  * #interruptTask()}, which will be invoked automatically if a call to {@link
43  * #cancel(boolean) cancel(true)} succeeds in canceling the future.
44  *
45  * <p>{@code AbstractFuture} uses an {@link AbstractQueuedSynchronizer} to deal
46  * with concurrency issues and guarantee thread safety.
47  *
48  * <p>The state changing methods all return a boolean indicating success or
49  * failure in changing the future's state.  Valid states are running,
50  * completed, failed, or cancelled.
51  *
52  * <p>This class uses an {@link ExecutionList} to guarantee that all registered
53  * listeners will be executed, either when the future finishes or, for listeners
54  * that are added after the future completes, immediately.
55  * {@code Runnable}-{@code Executor} pairs are stored in the execution list but
56  * are not necessarily executed in the order in which they were added.  (If a
57  * listener is added after the Future is complete, it will be executed
58  * immediately, even if earlier listeners have not been executed. Additionally,
59  * executors need not guarantee FIFO execution, or different listeners may run
60  * in different executors.)
61  *
62  * @author Sven Mawson
63  * @since 1.0
64  */
65 public abstract class AbstractFuture<V> implements ListenableFuture<V> {
66 
67   /** Synchronization control for AbstractFutures. */
68   private final Sync<V> sync = new Sync<V>();
69 
70   // The execution list to hold our executors.
71   private final ExecutionList executionList = new ExecutionList();
72 
73   /**
74    * Constructor for use by subclasses.
75    */
AbstractFuture()76   protected AbstractFuture() {}
77 
78   /*
79    * Improve the documentation of when InterruptedException is thrown. Our
80    * behavior matches the JDK's, but the JDK's documentation is misleading.
81    */
82   /**
83    * {@inheritDoc}
84    *
85    * <p>The default {@link AbstractFuture} implementation throws {@code
86    * InterruptedException} if the current thread is interrupted before or during
87    * the call, even if the value is already available.
88    *
89    * @throws InterruptedException if the current thread was interrupted before
90    *     or during the call (optional but recommended).
91    * @throws CancellationException {@inheritDoc}
92    */
93   @Override
get(long timeout, TimeUnit unit)94   public V get(long timeout, TimeUnit unit) throws InterruptedException,
95       TimeoutException, ExecutionException {
96     return sync.get(unit.toNanos(timeout));
97   }
98 
99   /*
100    * Improve the documentation of when InterruptedException is thrown. Our
101    * behavior matches the JDK's, but the JDK's documentation is misleading.
102    */
103   /**
104    * {@inheritDoc}
105    *
106    * <p>The default {@link AbstractFuture} implementation throws {@code
107    * InterruptedException} if the current thread is interrupted before or during
108    * the call, even if the value is already available.
109    *
110    * @throws InterruptedException if the current thread was interrupted before
111    *     or during the call (optional but recommended).
112    * @throws CancellationException {@inheritDoc}
113    */
114   @Override
get()115   public V get() throws InterruptedException, ExecutionException {
116     return sync.get();
117   }
118 
119   @Override
isDone()120   public boolean isDone() {
121     return sync.isDone();
122   }
123 
124   @Override
isCancelled()125   public boolean isCancelled() {
126     return sync.isCancelled();
127   }
128 
129   @Override
cancel(boolean mayInterruptIfRunning)130   public boolean cancel(boolean mayInterruptIfRunning) {
131     if (!sync.cancel(mayInterruptIfRunning)) {
132       return false;
133     }
134     executionList.execute();
135     if (mayInterruptIfRunning) {
136       interruptTask();
137     }
138     return true;
139   }
140 
141   /**
142    * Subclasses can override this method to implement interruption of the
143    * future's computation. The method is invoked automatically by a successful
144    * call to {@link #cancel(boolean) cancel(true)}.
145    *
146    * <p>The default implementation does nothing.
147    *
148    * @since 10.0
149    */
interruptTask()150   protected void interruptTask() {
151   }
152 
153   /**
154    * Returns true if this future was cancelled with {@code
155    * mayInterruptIfRunning} set to {@code true}.
156    *
157    * @since 14.0
158    */
wasInterrupted()159   protected final boolean wasInterrupted() {
160     return sync.wasInterrupted();
161   }
162 
163   /**
164    * {@inheritDoc}
165    *
166    * @since 10.0
167    */
168   @Override
addListener(Runnable listener, Executor exec)169   public void addListener(Runnable listener, Executor exec) {
170     executionList.add(listener, exec);
171   }
172 
173   /**
174    * Subclasses should invoke this method to set the result of the computation
175    * to {@code value}.  This will set the state of the future to
176    * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the
177    * state was successfully changed.
178    *
179    * @param value the value that was the result of the task.
180    * @return true if the state was successfully changed.
181    */
set(@ullable V value)182   protected boolean set(@Nullable V value) {
183     boolean result = sync.set(value);
184     if (result) {
185       executionList.execute();
186     }
187     return result;
188   }
189 
190   /**
191    * Subclasses should invoke this method to set the result of the computation
192    * to an error, {@code throwable}.  This will set the state of the future to
193    * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the
194    * state was successfully changed.
195    *
196    * @param throwable the exception that the task failed with.
197    * @return true if the state was successfully changed.
198    */
setException(Throwable throwable)199   protected boolean setException(Throwable throwable) {
200     boolean result = sync.setException(checkNotNull(throwable));
201     if (result) {
202       executionList.execute();
203     }
204     return result;
205   }
206 
207   /**
208    * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
209    * private subclass to hold the synchronizer.  This synchronizer is used to
210    * implement the blocking and waiting calls as well as to handle state changes
211    * in a thread-safe manner.  The current state of the future is held in the
212    * Sync state, and the lock is released whenever the state changes to
213    * {@link #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED}
214    *
215    * <p>To avoid races between threads doing release and acquire, we transition
216    * to the final state in two steps.  One thread will successfully CAS from
217    * RUNNING to COMPLETING, that thread will then set the result of the
218    * computation, and only then transition to COMPLETED, CANCELLED, or
219    * INTERRUPTED.
220    *
221    * <p>We don't use the integer argument passed between acquire methods so we
222    * pass around a -1 everywhere.
223    */
224   static final class Sync<V> extends AbstractQueuedSynchronizer {
225 
226     private static final long serialVersionUID = 0L;
227 
228     /* Valid states. */
229     static final int RUNNING = 0;
230     static final int COMPLETING = 1;
231     static final int COMPLETED = 2;
232     static final int CANCELLED = 4;
233     static final int INTERRUPTED = 8;
234 
235     private V value;
236     private Throwable exception;
237 
238     /*
239      * Acquisition succeeds if the future is done, otherwise it fails.
240      */
241     @Override
tryAcquireShared(int ignored)242     protected int tryAcquireShared(int ignored) {
243       if (isDone()) {
244         return 1;
245       }
246       return -1;
247     }
248 
249     /*
250      * We always allow a release to go through, this means the state has been
251      * successfully changed and the result is available.
252      */
253     @Override
tryReleaseShared(int finalState)254     protected boolean tryReleaseShared(int finalState) {
255       setState(finalState);
256       return true;
257     }
258 
259     /**
260      * Blocks until the task is complete or the timeout expires.  Throws a
261      * {@link TimeoutException} if the timer expires, otherwise behaves like
262      * {@link #get()}.
263      */
get(long nanos)264     V get(long nanos) throws TimeoutException, CancellationException,
265         ExecutionException, InterruptedException {
266 
267       // Attempt to acquire the shared lock with a timeout.
268       if (!tryAcquireSharedNanos(-1, nanos)) {
269         throw new TimeoutException("Timeout waiting for task.");
270       }
271 
272       return getValue();
273     }
274 
275     /**
276      * Blocks until {@link #complete(Object, Throwable, int)} has been
277      * successfully called.  Throws a {@link CancellationException} if the task
278      * was cancelled, or a {@link ExecutionException} if the task completed with
279      * an error.
280      */
get()281     V get() throws CancellationException, ExecutionException,
282         InterruptedException {
283 
284       // Acquire the shared lock allowing interruption.
285       acquireSharedInterruptibly(-1);
286       return getValue();
287     }
288 
289     /**
290      * Implementation of the actual value retrieval.  Will return the value
291      * on success, an exception on failure, a cancellation on cancellation, or
292      * an illegal state if the synchronizer is in an invalid state.
293      */
getValue()294     private V getValue() throws CancellationException, ExecutionException {
295       int state = getState();
296       switch (state) {
297         case COMPLETED:
298           if (exception != null) {
299             throw new ExecutionException(exception);
300           } else {
301             return value;
302           }
303 
304         case CANCELLED:
305         case INTERRUPTED:
306           throw cancellationExceptionWithCause(
307               "Task was cancelled.", exception);
308 
309         default:
310           throw new IllegalStateException(
311               "Error, synchronizer in invalid state: " + state);
312       }
313     }
314 
315     /**
316      * Checks if the state is {@link #COMPLETED}, {@link #CANCELLED}, or {@link
317      * INTERRUPTED}.
318      */
isDone()319     boolean isDone() {
320       return (getState() & (COMPLETED | CANCELLED | INTERRUPTED)) != 0;
321     }
322 
323     /**
324      * Checks if the state is {@link #CANCELLED} or {@link #INTERRUPTED}.
325      */
isCancelled()326     boolean isCancelled() {
327       return (getState() & (CANCELLED | INTERRUPTED)) != 0;
328     }
329 
330     /**
331      * Checks if the state is {@link #INTERRUPTED}.
332      */
wasInterrupted()333     boolean wasInterrupted() {
334       return getState() == INTERRUPTED;
335     }
336 
337     /**
338      * Transition to the COMPLETED state and set the value.
339      */
set(@ullable V v)340     boolean set(@Nullable V v) {
341       return complete(v, null, COMPLETED);
342     }
343 
344     /**
345      * Transition to the COMPLETED state and set the exception.
346      */
setException(Throwable t)347     boolean setException(Throwable t) {
348       return complete(null, t, COMPLETED);
349     }
350 
351     /**
352      * Transition to the CANCELLED or INTERRUPTED state.
353      */
cancel(boolean interrupt)354     boolean cancel(boolean interrupt) {
355       return complete(null, null, interrupt ? INTERRUPTED : CANCELLED);
356     }
357 
358     /**
359      * Implementation of completing a task.  Either {@code v} or {@code t} will
360      * be set but not both.  The {@code finalState} is the state to change to
361      * from {@link #RUNNING}.  If the state is not in the RUNNING state we
362      * return {@code false} after waiting for the state to be set to a valid
363      * final state ({@link #COMPLETED}, {@link #CANCELLED}, or {@link
364      * #INTERRUPTED}).
365      *
366      * @param v the value to set as the result of the computation.
367      * @param t the exception to set as the result of the computation.
368      * @param finalState the state to transition to.
369      */
complete(@ullable V v, @Nullable Throwable t, int finalState)370     private boolean complete(@Nullable V v, @Nullable Throwable t,
371         int finalState) {
372       boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
373       if (doCompletion) {
374         // If this thread successfully transitioned to COMPLETING, set the value
375         // and exception and then release to the final state.
376         this.value = v;
377         // Don't actually construct a CancellationException until necessary.
378         this.exception = ((finalState & (CANCELLED | INTERRUPTED)) != 0)
379             ? new CancellationException("Future.cancel() was called.") : t;
380         releaseShared(finalState);
381       } else if (getState() == COMPLETING) {
382         // If some other thread is currently completing the future, block until
383         // they are done so we can guarantee completion.
384         acquireShared(-1);
385       }
386       return doCompletion;
387     }
388   }
389 
cancellationExceptionWithCause( @ullable String message, @Nullable Throwable cause)390   static final CancellationException cancellationExceptionWithCause(
391       @Nullable String message, @Nullable Throwable cause) {
392     CancellationException exception = new CancellationException(message);
393     exception.initCause(cause);
394     return exception;
395   }
396 }
397