• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2014 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import com.google.errorprone.annotations.CanIgnoreReturnValue;
22 import java.util.concurrent.CancellationException;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
28 import org.checkerframework.checker.nullness.qual.Nullable;
29 
30 /** Utilities for the AbstractFutureBenchmarks */
31 final class AbstractFutureBenchmarks {
AbstractFutureBenchmarks()32   private AbstractFutureBenchmarks() {}
33 
34   interface Facade<T> extends ListenableFuture<T> {
35     @CanIgnoreReturnValue
set(T t)36     boolean set(T t);
37 
38     @CanIgnoreReturnValue
setException(Throwable t)39     boolean setException(Throwable t);
40   }
41 
42   private static class NewAbstractFutureFacade<T> extends AbstractFuture<T> implements Facade<T> {
43     @CanIgnoreReturnValue
44     @Override
set(T t)45     public boolean set(T t) {
46       return super.set(t);
47     }
48 
49     @CanIgnoreReturnValue
50     @Override
setException(Throwable t)51     public boolean setException(Throwable t) {
52       return super.setException(t);
53     }
54   }
55 
56   private static class OldAbstractFutureFacade<T> extends OldAbstractFuture<T>
57       implements Facade<T> {
58     @CanIgnoreReturnValue
59     @Override
set(T t)60     public boolean set(T t) {
61       return super.set(t);
62     }
63 
64     @CanIgnoreReturnValue
65     @Override
setException(Throwable t)66     public boolean setException(Throwable t) {
67       return super.setException(t);
68     }
69   }
70 
71   enum Impl {
72     NEW {
73       @Override
newFacade()74       <T> Facade<T> newFacade() {
75         return new NewAbstractFutureFacade<T>();
76       }
77     },
78     OLD {
79       @Override
newFacade()80       <T> Facade<T> newFacade() {
81         return new OldAbstractFutureFacade<T>();
82       }
83     };
84 
newFacade()85     abstract <T> Facade<T> newFacade();
86   }
87 
awaitWaiting(Thread t)88   static void awaitWaiting(Thread t) {
89     while (true) {
90       Thread.State state = t.getState();
91       switch (state) {
92         case RUNNABLE:
93         case BLOCKED:
94           Thread.yield();
95           break;
96         case WAITING:
97           return;
98         default:
99           throw new AssertionError("unexpected state: " + state);
100       }
101     }
102   }
103 
104   abstract static class OldAbstractFuture<V> implements ListenableFuture<V> {
105 
106     /** Synchronization control for AbstractFutures. */
107     private final Sync<V> sync = new Sync<V>();
108 
109     // The execution list to hold our executors.
110     private final ExecutionList executionList = new ExecutionList();
111 
112     /** Constructor for use by subclasses. */
OldAbstractFuture()113     protected OldAbstractFuture() {}
114 
115     /*
116      * Improve the documentation of when InterruptedException is thrown. Our
117      * behavior matches the JDK's, but the JDK's documentation is misleading.
118      */
119     /**
120      * {@inheritDoc}
121      *
122      * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if
123      * the current thread is interrupted before or during the call, even if the value is already
124      * available.
125      *
126      * @throws InterruptedException if the current thread was interrupted before or during the call
127      *     (optional but recommended).
128      * @throws CancellationException {@inheritDoc}
129      */
130     @CanIgnoreReturnValue
131     @Override
get(long timeout, TimeUnit unit)132     public V get(long timeout, TimeUnit unit)
133         throws InterruptedException, TimeoutException, ExecutionException {
134       return sync.get(unit.toNanos(timeout));
135     }
136 
137     /*
138      * Improve the documentation of when InterruptedException is thrown. Our
139      * behavior matches the JDK's, but the JDK's documentation is misleading.
140      */
141     /**
142      * {@inheritDoc}
143      *
144      * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if
145      * the current thread is interrupted before or during the call, even if the value is already
146      * available.
147      *
148      * @throws InterruptedException if the current thread was interrupted before or during the call
149      *     (optional but recommended).
150      * @throws CancellationException {@inheritDoc}
151      */
152     @CanIgnoreReturnValue
153     @Override
get()154     public V get() throws InterruptedException, ExecutionException {
155       return sync.get();
156     }
157 
158     @Override
isDone()159     public boolean isDone() {
160       return sync.isDone();
161     }
162 
163     @Override
isCancelled()164     public boolean isCancelled() {
165       return sync.isCancelled();
166     }
167 
168     @CanIgnoreReturnValue
169     @Override
cancel(boolean mayInterruptIfRunning)170     public boolean cancel(boolean mayInterruptIfRunning) {
171       if (!sync.cancel(mayInterruptIfRunning)) {
172         return false;
173       }
174       executionList.execute();
175       if (mayInterruptIfRunning) {
176         interruptTask();
177       }
178       return true;
179     }
180 
181     /**
182      * Subclasses can override this method to implement interruption of the future's computation.
183      * The method is invoked automatically by a successful call to {@link #cancel(boolean)
184      * cancel(true)}.
185      *
186      * <p>The default implementation does nothing.
187      *
188      * @since 10.0
189      */
interruptTask()190     protected void interruptTask() {}
191 
192     /**
193      * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code
194      * true}.
195      *
196      * @since 14.0
197      */
wasInterrupted()198     protected final boolean wasInterrupted() {
199       return sync.wasInterrupted();
200     }
201 
202     /**
203      * {@inheritDoc}
204      *
205      * @since 10.0
206      */
207     @Override
addListener(Runnable listener, Executor exec)208     public void addListener(Runnable listener, Executor exec) {
209       executionList.add(listener, exec);
210     }
211 
212     /**
213      * Subclasses should invoke this method to set the result of the computation to {@code value}.
214      * This will set the state of the future to {@link OldAbstractFuture.Sync#COMPLETED} and invoke
215      * the listeners if the state was successfully changed.
216      *
217      * @param value the value that was the result of the task.
218      * @return true if the state was successfully changed.
219      */
220     @CanIgnoreReturnValue
set(@ullable V value)221     protected boolean set(@Nullable V value) {
222       boolean result = sync.set(value);
223       if (result) {
224         executionList.execute();
225       }
226       return result;
227     }
228 
229     /**
230      * Subclasses should invoke this method to set the result of the computation to an error, {@code
231      * throwable}. This will set the state of the future to {@link OldAbstractFuture.Sync#COMPLETED}
232      * and invoke the listeners if the state was successfully changed.
233      *
234      * @param throwable the exception that the task failed with.
235      * @return true if the state was successfully changed.
236      */
237     @CanIgnoreReturnValue
setException(Throwable throwable)238     protected boolean setException(Throwable throwable) {
239       boolean result = sync.setException(checkNotNull(throwable));
240       if (result) {
241         executionList.execute();
242       }
243       return result;
244     }
245 
246     /**
247      * Following the contract of {@link AbstractQueuedSynchronizer} we create a private subclass to
248      * hold the synchronizer. This synchronizer is used to implement the blocking and waiting calls
249      * as well as to handle state changes in a thread-safe manner. The current state of the future
250      * is held in the Sync state, and the lock is released whenever the state changes to {@link
251      * #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED}
252      *
253      * <p>To avoid races between threads doing release and acquire, we transition to the final state
254      * in two steps. One thread will successfully CAS from RUNNING to COMPLETING, that thread will
255      * then set the result of the computation, and only then transition to COMPLETED, CANCELLED, or
256      * INTERRUPTED.
257      *
258      * <p>We don't use the integer argument passed between acquire methods so we pass around a -1
259      * everywhere.
260      */
261     static final class Sync<V> extends AbstractQueuedSynchronizer {
262 
263       private static final long serialVersionUID = 0L;
264 
265       /* Valid states. */
266       static final int RUNNING = 0;
267       static final int COMPLETING = 1;
268       static final int COMPLETED = 2;
269       static final int CANCELLED = 4;
270       static final int INTERRUPTED = 8;
271 
272       private V value;
273       private Throwable exception;
274 
275       /*
276        * Acquisition succeeds if the future is done, otherwise it fails.
277        */
278       @Override
tryAcquireShared(int ignored)279       protected int tryAcquireShared(int ignored) {
280         if (isDone()) {
281           return 1;
282         }
283         return -1;
284       }
285 
286       /*
287        * We always allow a release to go through, this means the state has been
288        * successfully changed and the result is available.
289        */
290       @Override
tryReleaseShared(int finalState)291       protected boolean tryReleaseShared(int finalState) {
292         setState(finalState);
293         return true;
294       }
295 
296       /**
297        * Blocks until the task is complete or the timeout expires. Throws a {@link TimeoutException}
298        * if the timer expires, otherwise behaves like {@link #get()}.
299        */
get(long nanos)300       V get(long nanos)
301           throws TimeoutException, CancellationException, ExecutionException, InterruptedException {
302 
303         // Attempt to acquire the shared lock with a timeout.
304         if (!tryAcquireSharedNanos(-1, nanos)) {
305           throw new TimeoutException("Timeout waiting for task.");
306         }
307 
308         return getValue();
309       }
310 
311       /**
312        * Blocks until {@link #complete(Object, Throwable, int)} has been successfully called. Throws
313        * a {@link CancellationException} if the task was cancelled, or a {@link ExecutionException}
314        * if the task completed with an error.
315        */
get()316       V get() throws CancellationException, ExecutionException, InterruptedException {
317 
318         // Acquire the shared lock allowing interruption.
319         acquireSharedInterruptibly(-1);
320         return getValue();
321       }
322 
323       /**
324        * Implementation of the actual value retrieval. Will return the value on success, an
325        * exception on failure, a cancellation on cancellation, or an illegal state if the
326        * synchronizer is in an invalid state.
327        */
getValue()328       private V getValue() throws CancellationException, ExecutionException {
329         int state = getState();
330         switch (state) {
331           case COMPLETED:
332             if (exception != null) {
333               throw new ExecutionException(exception);
334             } else {
335               return value;
336             }
337 
338           case CANCELLED:
339           case INTERRUPTED:
340             throw cancellationExceptionWithCause("Task was cancelled.", exception);
341 
342           default:
343             throw new IllegalStateException("Error, synchronizer in invalid state: " + state);
344         }
345       }
346 
347       /** Checks if the state is {@link #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED}. */
isDone()348       boolean isDone() {
349         return (getState() & (COMPLETED | CANCELLED | INTERRUPTED)) != 0;
350       }
351 
352       /** Checks if the state is {@link #CANCELLED} or {@link #INTERRUPTED}. */
isCancelled()353       boolean isCancelled() {
354         return (getState() & (CANCELLED | INTERRUPTED)) != 0;
355       }
356 
357       /** Checks if the state is {@link #INTERRUPTED}. */
wasInterrupted()358       boolean wasInterrupted() {
359         return getState() == INTERRUPTED;
360       }
361 
362       /** Transition to the COMPLETED state and set the value. */
set(@ullable V v)363       boolean set(@Nullable V v) {
364         return complete(v, null, COMPLETED);
365       }
366 
367       /** Transition to the COMPLETED state and set the exception. */
setException(Throwable t)368       boolean setException(Throwable t) {
369         return complete(null, t, COMPLETED);
370       }
371 
372       /** Transition to the CANCELLED or INTERRUPTED state. */
cancel(boolean interrupt)373       boolean cancel(boolean interrupt) {
374         return complete(null, null, interrupt ? INTERRUPTED : CANCELLED);
375       }
376 
377       /**
378        * Implementation of completing a task. Either {@code v} or {@code t} will be set but not
379        * both. The {@code finalState} is the state to change to from {@link #RUNNING}. If the state
380        * is not in the RUNNING state we return {@code false} after waiting for the state to be set
381        * to a valid final state ({@link #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED}).
382        *
383        * @param v the value to set as the result of the computation.
384        * @param t the exception to set as the result of the computation.
385        * @param finalState the state to transition to.
386        */
complete(@ullable V v, @Nullable Throwable t, int finalState)387       private boolean complete(@Nullable V v, @Nullable Throwable t, int finalState) {
388         boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
389         if (doCompletion) {
390           // If this thread successfully transitioned to COMPLETING, set the value
391           // and exception and then release to the final state.
392           this.value = v;
393           // Don't actually construct a CancellationException until necessary.
394           this.exception =
395               ((finalState & (CANCELLED | INTERRUPTED)) != 0)
396                   ? new CancellationException("Future.cancel() was called.")
397                   : t;
398           releaseShared(finalState);
399         } else if (getState() == COMPLETING) {
400           // If some other thread is currently completing the future, block until
401           // they are done so we can guarantee completion.
402           acquireShared(-1);
403         }
404         return doCompletion;
405       }
406     }
407 
cancellationExceptionWithCause( @ullable String message, @Nullable Throwable cause)408     static final CancellationException cancellationExceptionWithCause(
409         @Nullable String message, @Nullable Throwable cause) {
410       CancellationException exception = new CancellationException(message);
411       exception.initCause(cause);
412       return exception;
413     }
414   }
415 }
416