1 /*
2  * Copyright 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.work.impl.utils.futures;
18 
19 import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
20 
21 import androidx.annotation.RestrictTo;
22 
23 import com.google.common.util.concurrent.ListenableFuture;
24 
25 import org.jspecify.annotations.NonNull;
26 import org.jspecify.annotations.Nullable;
27 
28 import java.util.Locale;
29 import java.util.concurrent.CancellationException;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.ScheduledFuture;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
37 import java.util.concurrent.locks.LockSupport;
38 import java.util.logging.Level;
39 import java.util.logging.Logger;
40 
41 /**
42  * Cloned from concurrent-futures package to avoid AndroidX namespace issues since there is no
43  * supportlib 28.* equivalent of this class.
44  *
45  * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More
46  * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture},
47  * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an
48  * existing one, typically using methods like {@link Futures#transform(ListenableFuture,
49  * com.google.common.base.Function, Executor) Futures.transform} and {@link
50  * Futures#catching(ListenableFuture, Class, com.google.common.base.Function,
51  * Executor) Futures.catching}.
52  *
53  * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way
54  * to set the result of the computation through the protected methods {@link #set(Object)}, {@link
55  * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override
56  * {@link #afterDone()}, which will be invoked automatically when the future completes. Subclasses
57  * should rarely override other methods.
58  *
59  * @author Sven Mawson
60  * @author Luke Sandberg
61  * @since 1.0
62  */
63 @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
64 @SuppressWarnings("ShortCircuitBoolean") // we use non-short circuiting comparisons intentionally
65 public abstract class AbstractFuture<V> implements ListenableFuture<V> {
66 
67     // NOTE: Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, ||
68 
69     @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor.
70     static final boolean GENERATE_CANCELLATION_CAUSES =
71             Boolean.parseBoolean(
72                     System.getProperty("guava.concurrent.generate_cancellation_cause", "false"));
73 
74     // Logger to log exceptions caught when running listeners.
75     private static final Logger log = Logger.getLogger(AbstractFuture.class.getName());
76 
77     // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of
78     // blocking. This value is what AbstractQueuedSynchronizer uses.
79     private static final long SPIN_THRESHOLD_NANOS = 1000L;
80 
81     @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor.
82     static final AtomicHelper ATOMIC_HELPER;
83 
84     static {
85         AtomicHelper helper;
86         Throwable thrownAtomicReferenceFieldUpdaterFailure = null;
87 
88         // The access control checks that ARFU does means the caller class has to be
89         // AbstractFuture instead of SafeAtomicHelper, so we annoyingly define these here
90         try {
91             helper =
92                     new SafeAtomicHelper(
93                             newUpdater(Waiter.class, Thread.class, "thread"),
94                             newUpdater(Waiter.class, Waiter.class, "next"),
95                             newUpdater(AbstractFuture.class, Waiter.class, "waiters"),
96                             newUpdater(AbstractFuture.class, Listener.class, "listeners"),
97                             newUpdater(AbstractFuture.class, Object.class, "value"));
98         } catch (Throwable atomicReferenceFieldUpdaterFailure) {
99             // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause
100             // getDeclaredField to throw a NoSuchFieldException when the field is definitely
101             // there. For these users fallback to a suboptimal implementation,
102             // based on synchronized. This will be a definite performance hit to those users.
103             thrownAtomicReferenceFieldUpdaterFailure = atomicReferenceFieldUpdaterFailure;
104             helper = new SynchronizedHelper();
105         }
106 
107         ATOMIC_HELPER = helper;
108 
109         // Prevent rare disastrous classloading in first call to LockSupport.park.
110         // See: https://bugs.openjdk.java.net/browse/JDK-8074773
111         @SuppressWarnings("unused")
112         Class<?> ensureLoaded = LockSupport.class;
113 
114         // Log after all static init is finished; if an installed logger uses any Futures
115         // methods, it shouldn't break in cases where reflection is missing/broken.
116         if (thrownAtomicReferenceFieldUpdaterFailure != null) {
log.log(Level.SEVERE, "SafeAtomicHelper is broken!", thrownAtomicReferenceFieldUpdaterFailure)117             log.log(Level.SEVERE, "SafeAtomicHelper is broken!",
118                     thrownAtomicReferenceFieldUpdaterFailure);
119         }
120     }
121 
122     /** Waiter links form a Treiber stack, in the {@link #waiters} field. */
123     private static final class Waiter {
124         static final Waiter TOMBSTONE = new Waiter(false /* ignored param */);
125 
126         volatile @Nullable Thread thread;
127         volatile @Nullable Waiter next;
128 
129         /**
130          * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded
131          * before the ATOMIC_HELPER. Apparently this is possible on some android platforms.
132          */
Waiter(boolean unused)133         Waiter(boolean unused) {
134         }
135 
Waiter()136         Waiter() {
137             // avoid volatile write, write is made visible by subsequent CAS on waiters field
138             ATOMIC_HELPER.putThread(this, Thread.currentThread());
139         }
140 
141         // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters
142         // field.
setNext(Waiter next)143         void setNext(Waiter next) {
144             ATOMIC_HELPER.putNext(this, next);
145         }
146 
unpark()147         void unpark() {
148             // This is racy with removeWaiter. The consequence of the race is that we may
149             // spuriously call unpark even though the thread has already removed itself
150             // from the list. But even if we did use a CAS, that race would still exist
151             // (it would just be ever so slightly smaller).
152             Thread w = thread;
153             if (w != null) {
154                 thread = null;
155                 LockSupport.unpark(w);
156             }
157         }
158     }
159 
160     /**
161      * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted
162      * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are
163      * saved by two things.
164      *
165      * <ul>
166      * <li>This is only called when a waiting thread times out or is interrupted. Both of which
167      * should be rare.
168      * <li>The waiters list should be very short.
169      * </ul>
170      */
removeWaiter(Waiter node)171     private void removeWaiter(Waiter node) {
172         node.thread = null; // mark as 'deleted'
173         restart:
174         while (true) {
175             Waiter pred = null;
176             Waiter curr = waiters;
177             if (curr == Waiter.TOMBSTONE) {
178                 return; // give up if someone is calling complete
179             }
180             Waiter succ;
181             while (curr != null) {
182                 succ = curr.next;
183                 if (curr.thread != null) { // we aren't unlinking this node, update pred.
184                     pred = curr;
185                 } else if (pred != null) { // We are unlinking this node and it has a predecessor.
186                     pred.next = succ;
187                     if (pred.thread == null) {
188                         // We raced with another node that unlinked pred. Restart.
189                         continue restart;
190                     }
191                 } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { // We are unlinking head
192                     continue restart; // We raced with an add or complete
193                 }
194                 curr = succ;
195             }
196             break;
197         }
198     }
199 
200     /** Listeners also form a stack through the {@link #listeners} field. */
201     private static final class Listener {
202         static final Listener TOMBSTONE = new Listener(null, null);
203         final Runnable task;
204         final Executor executor;
205 
206         // writes to next are made visible by subsequent CAS's on the listeners field
207         @Nullable Listener next;
208 
Listener(Runnable task, Executor executor)209         Listener(Runnable task, Executor executor) {
210             this.task = task;
211             this.executor = executor;
212         }
213     }
214 
215     /** A special value to represent {@code null}. */
216     private static final Object NULL = new Object();
217 
218     /** A special value to represent failure, when {@link #setException} is called successfully. */
219     private static final class Failure {
220         static final Failure FALLBACK_INSTANCE =
221                 new Failure(
222                         new Throwable("Failure occurred while trying to finish a future.") {
223                             @Override
224                             public synchronized Throwable fillInStackTrace() {
225                                 return this; // no stack trace
226                             }
227                         });
228         final Throwable exception;
229 
Failure(Throwable exception)230         Failure(Throwable exception) {
231             this.exception = checkNotNull(exception);
232         }
233     }
234 
235     /** A special value to represent cancellation and the 'wasInterrupted' bit. */
236     private static final class Cancellation {
237         // constants to use when GENERATE_CANCELLATION_CAUSES = false
238         static final Cancellation CAUSELESS_INTERRUPTED;
239         static final Cancellation CAUSELESS_CANCELLED;
240 
241         static {
242             if (GENERATE_CANCELLATION_CAUSES) {
243                 CAUSELESS_CANCELLED = null;
244                 CAUSELESS_INTERRUPTED = null;
245             } else {
246                 CAUSELESS_CANCELLED = new Cancellation(false, null);
247                 CAUSELESS_INTERRUPTED = new Cancellation(true, null);
248             }
249         }
250 
251         final boolean wasInterrupted;
252         final @Nullable Throwable cause;
253 
Cancellation(boolean wasInterrupted, @Nullable Throwable cause)254         Cancellation(boolean wasInterrupted, @Nullable Throwable cause) {
255             this.wasInterrupted = wasInterrupted;
256             this.cause = cause;
257         }
258     }
259 
260     /** A special value that encodes the 'setFuture' state. */
261     private static final class SetFuture<V> implements Runnable {
262         final AbstractFuture<V> owner;
263         final ListenableFuture<? extends V> future;
264 
SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future)265         SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) {
266             this.owner = owner;
267             this.future = future;
268         }
269 
270         @Override
run()271         public void run() {
272             if (owner.value != this) {
273                 // nothing to do, we must have been cancelled, don't bother inspecting the future.
274                 return;
275             }
276             Object valueToSet = getFutureValue(future);
277             if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) {
278                 complete(owner);
279             }
280         }
281     }
282 
283     // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is
284     // available.
285     /**
286      * This field encodes the current state of the future.
287      *
288      * <p>The valid values are:
289      *
290      * <ul>
291      * <li>{@code null} initial state, nothing has happened.
292      * <li>{@link Cancellation} terminal state, {@code cancel} was called.
293      * <li>{@link Failure} terminal state, {@code setException} was called.
294      * <li>{@link SetFuture} intermediate state, {@code setFuture} was called.
295      * <li>{@link #NULL} terminal state, {@code set(null)} was called.
296      * <li>Any other non-null value, terminal state, {@code set} was called with a non-null
297      * argument.
298      * </ul>
299      */
300     @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor.
301     volatile @Nullable Object value;
302 
303     /** All listeners. */
304     @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor.
305     volatile @Nullable Listener listeners;
306 
307     /** All waiting threads. */
308     @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor.
309     volatile @Nullable Waiter waiters;
310 
311     /** Constructor for use by subclasses. */
AbstractFuture()312     protected AbstractFuture() {
313     }
314 
315     // Gets and Timed Gets
316     //
317     // * Be responsive to interruption
318     // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the
319     //   waiters field.
320     // * Future completion is defined by when #value becomes non-null/non SetFuture
321     // * Future completion can be observed if the waiters field contains a TOMBSTONE
322 
323     // Timed Get
324     // There are a few design constraints to consider
325     // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I
326     //   have observed 12 micros on 64 bit linux systems to wake up a parked thread). So if the
327     //   timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of
328     //   spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for
329     //   similar purposes.
330     // * We want to behave reasonably for timeouts of 0
331     // * We are more responsive to completion than timeouts. This is because parkNanos depends on
332     //   system scheduling and as such we could either miss our deadline, or unpark() could be
333     //   delayed so that it looks like we timed out even though we didn't. For comparison FutureTask
334     //   respects completion preferably and AQS is non-deterministic (depends on where in the queue
335     //   the waiter is). If we wanted to be strict about it, we could store the unpark() time in
336     //   the Waiter node and we could use that to make a decision about whether or not we timed out
337     //   prior to being unparked.
338 
339     /**
340      * {@inheritDoc}
341      *
342      * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException}
343      * if the current thread is interrupted during the call, even if the value is already available.
344      *
345      * @throws CancellationException {@inheritDoc}
346      */
347     @Override
get(long timeout, TimeUnit unit)348     public final V get(long timeout, TimeUnit unit)
349             throws InterruptedException, TimeoutException, ExecutionException {
350         // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true)
351         // loop at the bottom and throw a timeoutexception.
352         // we rely on the implicit null check on unit.
353         final long timeoutNanos = unit.toNanos(timeout);
354         long remainingNanos = timeoutNanos;
355         if (Thread.interrupted()) {
356             throw new InterruptedException();
357         }
358         Object localValue = value;
359         if (localValue != null & !(localValue instanceof SetFuture)) {
360             return getDoneValue(localValue);
361         }
362         // we delay calling nanoTime until we know we will need to either park or spin
363         final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0;
364         long_wait_loop:
365         if (remainingNanos >= SPIN_THRESHOLD_NANOS) {
366             Waiter oldHead = waiters;
367             if (oldHead != Waiter.TOMBSTONE) {
368                 Waiter node = new Waiter();
369                 do {
370                     node.setNext(oldHead);
371                     if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
372                         while (true) {
373                             LockSupport.parkNanos(this, remainingNanos);
374                             // Check interruption first, if we woke up due to interruption we
375                             // need to honor that.
376                             if (Thread.interrupted()) {
377                                 removeWaiter(node);
378                                 throw new InterruptedException();
379                             }
380 
381                             // Otherwise re-read and check doneness. If we loop then it must have
382                             // been a spurious wakeup
383                             localValue = value;
384                             if (localValue != null & !(localValue instanceof SetFuture)) {
385                                 return getDoneValue(localValue);
386                             }
387 
388                             // timed out?
389                             remainingNanos = endNanos - System.nanoTime();
390                             if (remainingNanos < SPIN_THRESHOLD_NANOS) {
391                                 // Remove the waiter, one way or another we are done parking this
392                                 // thread.
393                                 removeWaiter(node);
394                                 break long_wait_loop; // jump down to the busy wait loop
395                             }
396                         }
397                     }
398                     oldHead = waiters; // re-read and loop.
399                 } while (oldHead != Waiter.TOMBSTONE);
400             }
401             // re-read value, if we get here then we must have observed a TOMBSTONE while trying
402             // to add a waiter.
403             return getDoneValue(value);
404         }
405         // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node
406         // on the waiters list
407         while (remainingNanos > 0) {
408             localValue = value;
409             if (localValue != null & !(localValue instanceof SetFuture)) {
410                 return getDoneValue(localValue);
411             }
412             if (Thread.interrupted()) {
413                 throw new InterruptedException();
414             }
415             remainingNanos = endNanos - System.nanoTime();
416         }
417 
418         String futureToString = toString();
419         final String unitString = unit.toString().toLowerCase(Locale.ROOT);
420         String message = "Waited " + timeout + " " + unit.toString().toLowerCase(Locale.ROOT);
421         // Only report scheduling delay if larger than our spin threshold - otherwise it's just
422         // noise
423         if (remainingNanos + SPIN_THRESHOLD_NANOS < 0) {
424             // We over-waited for our timeout.
425             message += " (plus ";
426             long overWaitNanos = -remainingNanos;
427             long overWaitUnits = unit.convert(overWaitNanos, TimeUnit.NANOSECONDS);
428             long overWaitLeftoverNanos = overWaitNanos - unit.toNanos(overWaitUnits);
429             boolean shouldShowExtraNanos =
430                     overWaitUnits == 0 || overWaitLeftoverNanos > SPIN_THRESHOLD_NANOS;
431             if (overWaitUnits > 0) {
432                 message += overWaitUnits + " " + unitString;
433                 if (shouldShowExtraNanos) {
434                     message += ",";
435                 }
436                 message += " ";
437             }
438             if (shouldShowExtraNanos) {
439                 message += overWaitLeftoverNanos + " nanoseconds ";
440             }
441 
442             message += "delay)";
443         }
444         // It's confusing to see a completed future in a timeout message; if isDone() returns false,
445         // then we know it must have given a pending toString value earlier. If not, then the future
446         // completed after the timeout expired, and the message might be success.
447         if (isDone()) {
448             throw new TimeoutException(message + " but future completed as timeout expired");
449         }
450         throw new TimeoutException(message + " for " + futureToString);
451     }
452 
453     /**
454      * {@inheritDoc}
455      *
456      * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException}
457      * if the current thread is interrupted during the call, even if the value is already available.
458      *
459      * @throws CancellationException {@inheritDoc}
460      */
461     @Override
get()462     public final V get() throws InterruptedException, ExecutionException {
463         if (Thread.interrupted()) {
464             throw new InterruptedException();
465         }
466         Object localValue = value;
467         if (localValue != null & !(localValue instanceof SetFuture)) {
468             return getDoneValue(localValue);
469         }
470         Waiter oldHead = waiters;
471         if (oldHead != Waiter.TOMBSTONE) {
472             Waiter node = new Waiter();
473             do {
474                 node.setNext(oldHead);
475                 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
476                     // we are on the stack, now wait for completion.
477                     while (true) {
478                         LockSupport.park(this);
479                         // Check interruption first, if we woke up due to interruption we need to
480                         // honor that.
481                         if (Thread.interrupted()) {
482                             removeWaiter(node);
483                             throw new InterruptedException();
484                         }
485                         // Otherwise re-read and check doneness. If we loop then it must have
486                         // been a spurious
487                         // wakeup
488                         localValue = value;
489                         if (localValue != null & !(localValue instanceof SetFuture)) {
490                             return getDoneValue(localValue);
491                         }
492                     }
493                 }
494                 oldHead = waiters; // re-read and loop.
495             } while (oldHead != Waiter.TOMBSTONE);
496         }
497         // re-read value, if we get here then we must have observed a TOMBSTONE while trying to
498         // add a waiter.
499         return getDoneValue(value);
500     }
501 
502     /** Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. */
getDoneValue(Object obj)503     private V getDoneValue(Object obj) throws ExecutionException {
504         // While this seems like it might be too branch-y, simple benchmarking proves it to be
505         // unmeasurable (comparing done AbstractFutures with immediateFuture)
506         if (obj instanceof Cancellation) {
507             throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause);
508         } else if (obj instanceof Failure) {
509             throw new ExecutionException(((Failure) obj).exception);
510         } else if (obj == NULL) {
511             return null;
512         } else {
513             @SuppressWarnings("unchecked") // this is the only other option
514                     V asV = (V) obj;
515             return asV;
516         }
517     }
518 
519     @Override
isDone()520     public final boolean isDone() {
521         final Object localValue = value;
522         return localValue != null & !(localValue instanceof SetFuture);
523     }
524 
525     @Override
isCancelled()526     public final boolean isCancelled() {
527         final Object localValue = value;
528         return localValue instanceof Cancellation;
529     }
530 
531     /**
532      * {@inheritDoc}
533      *
534      * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been
535      * {@linkplain #setFuture set asynchronously}, then the cancellation will also be propagated
536      * to the delegate {@code Future} that was supplied in the {@code setFuture} call.
537      *
538      * <p>Rather than override this method to perform additional cancellation work or cleanup,
539      * subclasses should override {@link #afterDone}, consulting {@link #isCancelled} and {@link
540      * #wasInterrupted} as necessary. This ensures that the work is done even if the future is
541      * cancelled without a call to {@code cancel}, such as by calling {@code
542      * setFuture(cancelledFuture)}.
543      */
544     @Override
cancel(boolean mayInterruptIfRunning)545     public final boolean cancel(boolean mayInterruptIfRunning) {
546         Object localValue = value;
547         boolean rValue = false;
548         if (localValue == null | localValue instanceof SetFuture) {
549             // Try to delay allocating the exception. At this point we may still lose the CAS,
550             // but it is certainly less likely.
551             Object valueToSet =
552                     GENERATE_CANCELLATION_CAUSES
553                             ? new Cancellation(
554                             mayInterruptIfRunning,
555                             new CancellationException("Future.cancel() was called."))
556                             : (mayInterruptIfRunning
557                                     ? Cancellation.CAUSELESS_INTERRUPTED
558                                     : Cancellation.CAUSELESS_CANCELLED);
559             AbstractFuture<?> abstractFuture = this;
560             while (true) {
561                 if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) {
562                     rValue = true;
563                     // We call interuptTask before calling complete(), which is consistent with
564                     // FutureTask
565                     if (mayInterruptIfRunning) {
566                         abstractFuture.interruptTask();
567                     }
568                     complete(abstractFuture);
569                     if (localValue instanceof SetFuture) {
570                         // propagate cancellation to the future set in setfuture, this is racy,
571                         // and we don't
572                         // care if we are successful or not.
573                         ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue).future;
574                         if (futureToPropagateTo instanceof AbstractFuture) {
575                             // If the future is a trusted then we specifically avoid
576                             // calling cancel() this has 2 benefits
577                             // 1. for long chains of futures strung together with setFuture we
578                             // consume less stack
579                             // 2. we avoid allocating Cancellation objects at every level of the
580                             // cancellation chain
581                             // We can only do this for TrustedFuture, because TrustedFuture
582                             // .cancel is final and does nothing but delegate to this method.
583                             AbstractFuture<?>
584                                     trusted = (AbstractFuture<?>) futureToPropagateTo;
585                             localValue = trusted.value;
586                             if (localValue == null | localValue instanceof SetFuture) {
587                                 abstractFuture = trusted;
588                                 continue; // loop back up and try to complete the new future
589                             }
590                         } else {
591                             // not a TrustedFuture, call cancel directly.
592                             futureToPropagateTo.cancel(mayInterruptIfRunning);
593                         }
594                     }
595                     break;
596                 }
597                 // obj changed, reread
598                 localValue = abstractFuture.value;
599                 if (!(localValue instanceof SetFuture)) {
600                     // obj cannot be null at this point, because value can only change from null
601                     // to non-null. So if value changed (and it did since we lost the CAS),
602                     // then it cannot be null and since it isn't a SetFuture, then the future must
603                     // be done and we should exit the loop
604                     break;
605                 }
606             }
607         }
608         return rValue;
609     }
610 
611     /**
612      * Subclasses can override this method to implement interruption of the future's computation.
613      * The method is invoked automatically by a successful call to
614      * {@link #cancel(boolean) cancel(true)}.
615      *
616      * <p>The default implementation does nothing.
617      *
618      * <p>This method is likely to be deprecated. Prefer to override {@link #afterDone}, checking
619      * {@link #wasInterrupted} to decide whether to interrupt your task.
620      *
621      * @since 10.0
622      */
interruptTask()623     protected void interruptTask() {
624     }
625 
626     /**
627      * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code
628      * true}.
629      *
630      * @since 14.0
631      */
wasInterrupted()632     protected final boolean wasInterrupted() {
633         final Object localValue = value;
634         return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted;
635     }
636 
637     /**
638      * {@inheritDoc}
639      *
640      * @since 10.0
641      */
642     @Override
addListener(Runnable listener, Executor executor)643     public final void addListener(Runnable listener, Executor executor) {
644         checkNotNull(listener);
645         checkNotNull(executor);
646         Listener oldHead = listeners;
647         if (oldHead != Listener.TOMBSTONE) {
648             Listener newNode = new Listener(listener, executor);
649             do {
650                 newNode.next = oldHead;
651                 if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
652                     return;
653                 }
654                 oldHead = listeners; // re-read
655             } while (oldHead != Listener.TOMBSTONE);
656         }
657         // If we get here then the Listener TOMBSTONE was set, which means the future is done, call
658         // the listener.
659         executeListener(listener, executor);
660     }
661 
662     /**
663      * Sets the result of this {@code Future} unless this {@code Future} has already been
664      * cancelled or set (including {@linkplain #setFuture set asynchronously}).
665      * When a call to this method returns, the {@code Future} is guaranteed to be
666      * {@linkplain #isDone done} <b>only if</b> the call was accepted (in which case it returns
667      * {@code true}). If it returns {@code false}, the {@code Future} may have previously been set
668      * asynchronously, in which case its result may not be known yet. That result,
669      * though not yet known, cannot be overridden by a call to a {@code set*} method,
670      * only by a call to {@link #cancel}.
671      *
672      * @param value the value to be used as the result
673      * @return true if the attempt was accepted, completing the {@code Future}
674      */
set(@ullable V value)675     protected boolean set(@Nullable V value) {
676         Object valueToSet = value == null ? NULL : value;
677         if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
678             complete(this);
679             return true;
680         }
681         return false;
682     }
683 
684     /**
685      * Sets the failed result of this {@code Future} unless this {@code Future} has already been
686      * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this
687      * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only
688      * if</b>
689      * the call was accepted (in which case it returns {@code true}). If it returns {@code
690      * false}, the
691      * {@code Future} may have previously been set asynchronously, in which case its result may
692      * not be
693      * known yet. That result, though not yet known, cannot be overridden by a call to a {@code
694      * set*}
695      * method, only by a call to {@link #cancel}.
696      *
697      * @param throwable the exception to be used as the failed result
698      * @return true if the attempt was accepted, completing the {@code Future}
699      */
setException(Throwable throwable)700     protected boolean setException(Throwable throwable) {
701         Object valueToSet = new Failure(checkNotNull(throwable));
702         if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
703             complete(this);
704             return true;
705         }
706         return false;
707     }
708 
709     /**
710      * Sets the result of this {@code Future} to match the supplied input {@code Future} once the
711      * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set
712      * (including "set asynchronously," defined below).
713      *
714      * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the
715      * call is accepted, then this future is guaranteed to have been completed with the supplied
716      * future by the time this method returns. If the supplied future is not done and the call
717      * is accepted, then the future will be <i>set asynchronously</i>. Note that such a result,
718      * though not yet known, cannot be overridden by a call to a {@code set*} method,
719      * only by a call to {@link #cancel}.
720      *
721      * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later
722      * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to
723      * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code
724      * Future}.
725      *
726      * <p>Note that, even if the supplied future is cancelled and it causes this future to complete,
727      * it will never trigger interruption behavior. In particular, it will not cause this future to
728      * invoke the {@link #interruptTask} method, and the {@link #wasInterrupted} method will not
729      * return {@code true}.
730      *
731      * @param future the future to delegate to
732      * @return true if the attempt was accepted, indicating that the {@code Future} was not
733      * previously cancelled or set.
734      * @since 19.0
735      */
setFuture(ListenableFuture<? extends V> future)736     protected boolean setFuture(ListenableFuture<? extends V> future) {
737         checkNotNull(future);
738         Object localValue = value;
739         if (localValue == null) {
740             if (future.isDone()) {
741                 Object value = getFutureValue(future);
742                 if (ATOMIC_HELPER.casValue(this, null, value)) {
743                     complete(this);
744                     return true;
745                 }
746                 return false;
747             }
748             SetFuture valueToSet = new SetFuture<V>(this, future);
749             if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
750                 // the listener is responsible for calling completeWithFuture, directExecutor is
751                 // appropriate since all we are doing is unpacking a completed future
752                 // which should be fast.
753                 try {
754                     future.addListener(valueToSet, DirectExecutor.INSTANCE);
755                 } catch (Throwable t) {
756                     // addListener has thrown an exception! SetFuture.run can't throw any
757                     // exceptions so this must have been caused by addListener itself.
758                     // The most likely explanation is a misconfigured mock.
759                     // Try to switch to Failure.
760                     Failure failure;
761                     try {
762                         failure = new Failure(t);
763                     } catch (Throwable oomMostLikely) {
764                         failure = Failure.FALLBACK_INSTANCE;
765                     }
766                     // Note: The only way this CAS could fail is if cancel() has raced with us.
767                     // That is ok.
768                     boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure);
769                 }
770                 return true;
771             }
772             localValue = value; // we lost the cas, fall through and maybe cancel
773         }
774         // The future has already been set to something. If it is cancellation we should cancel the
775         // incoming future.
776         if (localValue instanceof Cancellation) {
777             // we don't care if it fails, this is best-effort.
778             future.cancel(((Cancellation) localValue).wasInterrupted);
779         }
780         return false;
781     }
782 
783     /**
784      * Returns a value that satisfies the contract of the {@link #value} field based on the state of
785      * given future.
786      *
787      * <p>This is approximately the inverse of {@link #getDoneValue(Object)}
788      */
789     @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor.
getFutureValue(ListenableFuture<?> future)790     static Object getFutureValue(ListenableFuture<?> future) {
791         if (future instanceof AbstractFuture) {
792             // Break encapsulation for TrustedFuture instances since we know that subclasses cannot
793             // override .get() (since it is final) and therefore this is equivalent to calling
794             // .get() and unpacking the exceptions like we do below (just much faster because it is
795             // a single field read instead of a read, several branches and possibly
796             // creating exceptions).
797             Object v = ((AbstractFuture<?>) future).value;
798             if (v instanceof Cancellation) {
799                 // If the other future was interrupted, clear the interrupted bit while
800                 // preserving the cause this will make it consistent with how non-trustedfutures
801                 // work which cannot propagate the wasInterrupted bit
802                 Cancellation c = (Cancellation) v;
803                 if (c.wasInterrupted) {
804                     v = c.cause != null ? new Cancellation(/* wasInterrupted= */ false, c.cause)
805                                     : Cancellation.CAUSELESS_CANCELLED;
806                 }
807             }
808             return v;
809         }
810         boolean wasCancelled = future.isCancelled();
811         // Don't allocate a CancellationException if it's not necessary
812         if (!GENERATE_CANCELLATION_CAUSES & wasCancelled) {
813             return Cancellation.CAUSELESS_CANCELLED;
814         }
815         // Otherwise calculate the value by calling .get()
816         try {
817             Object v = getUninterruptibly(future);
818             return v == null ? NULL : v;
819         } catch (ExecutionException exception) {
820             return new Failure(exception.getCause());
821         } catch (CancellationException cancellation) {
822             if (!wasCancelled) {
823                 return new Failure(
824                         new IllegalArgumentException(
825                                 "get() threw CancellationException, despite reporting isCancelled"
826                                         + "() == false: "
827                                         + future,
828                                 cancellation));
829             }
830             return new Cancellation(false, cancellation);
831         } catch (Throwable t) {
832             return new Failure(t);
833         }
834     }
835 
836     /**
837      * internal dependency on other /util/concurrent classes.
838      */
getUninterruptibly(Future<V> future)839     private static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
840         boolean interrupted = false;
841         try {
842             while (true) {
843                 try {
844                     return future.get();
845                 } catch (InterruptedException e) {
846                     interrupted = true;
847                 }
848             }
849         } finally {
850             if (interrupted) {
851                 Thread.currentThread().interrupt();
852             }
853         }
854     }
855 
856     /** Unblocks all threads and runs all listeners. */
857     @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor.
complete(AbstractFuture<?> future)858     static void complete(AbstractFuture<?> future) {
859         Listener next = null;
860         outer:
861         while (true) {
862             future.releaseWaiters();
863             // We call this before the listeners in order to avoid needing to manage a separate
864             // stack data structure for them.  Also, some implementations rely on this running
865             // prior to listeners so that the cleanup work is visible to listeners.
866             // afterDone() should be generally fast and only used for cleanup work... but in
867             // theory can also be recursive and create StackOverflowErrors
868             future.afterDone();
869             // push the current set of listeners onto next
870             next = future.clearListeners(next);
871             future = null;
872             while (next != null) {
873                 Listener curr = next;
874                 next = next.next;
875                 Runnable task = curr.task;
876                 if (task instanceof SetFuture) {
877                     SetFuture<?> setFuture = (SetFuture<?>) task;
878                     // We unwind setFuture specifically to avoid StackOverflowErrors in the case
879                     // of long chains of SetFutures
880                     // Handling this special case is important because there is no way to pass an
881                     // executor to setFuture, so a user couldn't break the chain by doing this
882                     // themselves. It is also potentially common if someone writes a recursive
883                     // Futures.transformAsync transformer.
884                     future = setFuture.owner;
885                     if (future.value == setFuture) {
886                         Object valueToSet = getFutureValue(setFuture.future);
887                         if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) {
888                             continue outer;
889                         }
890                     }
891                     // other wise the future we were trying to set is already done.
892                 } else {
893                     executeListener(task, curr.executor);
894                 }
895             }
896             break;
897         }
898     }
899 
900     /**
901      * Callback method that is called exactly once after the future is completed.
902      *
903      * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it.
904      *
905      * <p>The default implementation of this method in {@code AbstractFuture} does nothing. This is
906      * intended for very lightweight cleanup work, for example, timing statistics or clearing
907      * fields.
908      * If your task does anything heavier consider, just using a listener with an executor.
909      *
910      * @since 20.0
911      */
afterDone()912     protected void afterDone() {
913     }
914 
915     /**
916      * If this future has been cancelled (and possibly interrupted), cancels (and possibly
917      * interrupts) the given future (if available).
918      */
919     @SuppressWarnings("ParameterNotNullable")
maybePropagateCancellationTo(@ullable Future<?> related)920     final void maybePropagateCancellationTo(@Nullable Future<?> related) {
921         if (related != null & isCancelled()) {
922             related.cancel(wasInterrupted());
923         }
924     }
925 
926     /** Releases all threads in the {@link #waiters} list, and clears the list. */
releaseWaiters()927     private void releaseWaiters() {
928         Waiter head;
929         do {
930             head = waiters;
931         } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE));
932         for (Waiter currentWaiter = head; currentWaiter != null;
933                 currentWaiter = currentWaiter.next) {
934             currentWaiter.unpark();
935         }
936     }
937 
938     /**
939      * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently
940      * added first.
941      */
clearListeners(Listener onto)942     private Listener clearListeners(Listener onto) {
943         // We need to
944         // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that to
945         //    to synchronize with us
946         // 2. reverse the linked list, because despite our rather clear contract, people depend
947         //    on us executing listeners in the order they were added
948         // 3. push all the items onto 'onto' and return the new head of the stack
949         Listener head;
950         do {
951             head = listeners;
952         } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE));
953         Listener reversedList = onto;
954         while (head != null) {
955             Listener tmp = head;
956             head = head.next;
957             tmp.next = reversedList;
958             reversedList = tmp;
959         }
960         return reversedList;
961     }
962 
963     // TODO(clm): move parts into a default method on ListenableFuture?
964     @Override
toString()965     public String toString() {
966         StringBuilder builder = new StringBuilder().append(super.toString()).append("[status=");
967         if (isCancelled()) {
968             builder.append("CANCELLED");
969         } else if (isDone()) {
970             addDoneString(builder);
971         } else {
972             String pendingDescription;
973             try {
974                 pendingDescription = pendingToString();
975             } catch (RuntimeException e) {
976                 // Don't call getMessage or toString() on the exception, in case the exception
977                 // thrown by the subclass is implemented with bugs similar to the subclass.
978                 pendingDescription = "Exception thrown from implementation: " + e.getClass();
979             }
980             // The future may complete during or before the call to getPendingToString, so we use
981             // null as a signal that we should try checking if the future is done again.
982             if (pendingDescription != null && !pendingDescription.isEmpty()) {
983                 builder.append("PENDING, info=[").append(pendingDescription).append("]");
984             } else if (isDone()) {
985                 addDoneString(builder);
986             } else {
987                 builder.append("PENDING");
988             }
989         }
990         return builder.append("]").toString();
991     }
992 
993     /**
994      * Provide a human-readable explanation of why this future has not yet completed.
995      *
996      * @return null if an explanation cannot be provided because the future is done.
997      * @since 23.0
998      */
pendingToString()999     protected @Nullable String pendingToString() {
1000         Object localValue = value;
1001         if (localValue instanceof SetFuture) {
1002             return "setFuture=[" + userObjectToString(((SetFuture) localValue).future) + "]";
1003         } else if (this instanceof ScheduledFuture) {
1004             return "remaining delay=["
1005                     + ((ScheduledFuture) this).getDelay(TimeUnit.MILLISECONDS)
1006                     + " ms]";
1007         }
1008         return null;
1009     }
1010 
addDoneString(StringBuilder builder)1011     private void addDoneString(StringBuilder builder) {
1012         try {
1013             V value = getUninterruptibly(this);
1014             builder.append("SUCCESS, result=[").append(userObjectToString(value)).append("]");
1015         } catch (ExecutionException e) {
1016             builder.append("FAILURE, cause=[").append(e.getCause()).append("]");
1017         } catch (CancellationException e) {
1018             builder.append("CANCELLED"); // shouldn't be reachable
1019         } catch (RuntimeException e) {
1020             builder.append("UNKNOWN, cause=[").append(e.getClass()).append(" thrown from get()]");
1021         }
1022     }
1023 
1024     /** Helper for printing user supplied objects into our toString method. */
userObjectToString(Object o)1025     private String userObjectToString(Object o) {
1026         // This is some basic recursion detection for when people create cycles via set/setFuture
1027         // This is however only partial protection though since it only detects self loops.  We
1028         // could detect arbitrary cycles using a thread local or possibly by catching
1029         // StackOverflowExceptions but this should be a good enough solution
1030         // (it is also what jdk collections do in these cases)
1031         if (o == this) {
1032             return "this future";
1033         }
1034         return String.valueOf(o);
1035     }
1036 
1037     /**
1038      * Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain
1039      * RuntimeException runtime exceptions} thrown by the executor.
1040      */
executeListener(Runnable runnable, Executor executor)1041     private static void executeListener(Runnable runnable, Executor executor) {
1042         try {
1043             executor.execute(runnable);
1044         } catch (RuntimeException e) {
1045             // Log it and keep going -- bad runnable and/or executor. Don't punish the other
1046             // runnables if we're given a bad one. We only catch RuntimeException
1047             // because we want Errors to propagate up.
1048             log.log(
1049                     Level.SEVERE,
1050                     "RuntimeException while executing runnable " + runnable + " with executor "
1051                             + executor,
1052                     e);
1053         }
1054     }
1055 
1056     private abstract static class AtomicHelper {
1057         /** Non volatile write of the thread to the {@link Waiter#thread} field. */
putThread(Waiter waiter, Thread newValue)1058         abstract void putThread(Waiter waiter, Thread newValue);
1059 
1060         /** Non volatile write of the waiter to the {@link Waiter#next} field. */
putNext(Waiter waiter, Waiter newValue)1061         abstract void putNext(Waiter waiter, Waiter newValue);
1062 
1063         /** Performs a CAS operation on the {@link #waiters} field. */
casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update)1064         abstract boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update);
1065 
1066         /** Performs a CAS operation on the {@link #listeners} field. */
casListeners(AbstractFuture<?> future, Listener expect, Listener update)1067         abstract boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update);
1068 
1069         /** Performs a CAS operation on the {@link #value} field. */
casValue(AbstractFuture<?> future, Object expect, Object update)1070         abstract boolean casValue(AbstractFuture<?> future, Object expect, Object update);
1071     }
1072 
1073     /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */
1074     private static final class SafeAtomicHelper extends AtomicHelper {
1075         final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater;
1076         final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater;
1077         final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater;
1078         final AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater;
1079         final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater;
1080 
SafeAtomicHelper( AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater, AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater, AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater)1081         SafeAtomicHelper(
1082                 AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater,
1083                 AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater,
1084                 AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater,
1085                 AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater,
1086                 AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) {
1087             this.waiterThreadUpdater = waiterThreadUpdater;
1088             this.waiterNextUpdater = waiterNextUpdater;
1089             this.waitersUpdater = waitersUpdater;
1090             this.listenersUpdater = listenersUpdater;
1091             this.valueUpdater = valueUpdater;
1092         }
1093 
1094         @Override
putThread(Waiter waiter, Thread newValue)1095         void putThread(Waiter waiter, Thread newValue) {
1096             waiterThreadUpdater.lazySet(waiter, newValue);
1097         }
1098 
1099         @Override
putNext(Waiter waiter, Waiter newValue)1100         void putNext(Waiter waiter, Waiter newValue) {
1101             waiterNextUpdater.lazySet(waiter, newValue);
1102         }
1103 
1104         @Override
casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update)1105         boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) {
1106             return waitersUpdater.compareAndSet(future, expect, update);
1107         }
1108 
1109         @Override
casListeners(AbstractFuture<?> future, Listener expect, Listener update)1110         boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) {
1111             return listenersUpdater.compareAndSet(future, expect, update);
1112         }
1113 
1114         @Override
casValue(AbstractFuture<?> future, Object expect, Object update)1115         boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
1116             return valueUpdater.compareAndSet(future, expect, update);
1117         }
1118     }
1119 
1120     /**
1121      * {@link AtomicHelper} based on {@code synchronized} and volatile writes.
1122      *
1123      * <p>This is an implementation of last resort for when certain basic VM features are broken
1124      * (like AtomicReferenceFieldUpdater).
1125      */
1126     private static final class SynchronizedHelper extends AtomicHelper {
SynchronizedHelper()1127         SynchronizedHelper() {
1128         }
1129 
1130         @Override
putThread(Waiter waiter, Thread newValue)1131         void putThread(Waiter waiter, Thread newValue) {
1132             waiter.thread = newValue;
1133         }
1134 
1135         @Override
putNext(Waiter waiter, Waiter newValue)1136         void putNext(Waiter waiter, Waiter newValue) {
1137             waiter.next = newValue;
1138         }
1139 
1140         @Override
casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update)1141         boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) {
1142             synchronized (future) {
1143                 if (future.waiters == expect) {
1144                     future.waiters = update;
1145                     return true;
1146                 }
1147                 return false;
1148             }
1149         }
1150 
1151         @Override
casListeners(AbstractFuture<?> future, Listener expect, Listener update)1152         boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) {
1153             synchronized (future) {
1154                 if (future.listeners == expect) {
1155                     future.listeners = update;
1156                     return true;
1157                 }
1158                 return false;
1159             }
1160         }
1161 
1162         @Override
casValue(AbstractFuture<?> future, Object expect, Object update)1163         boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
1164             synchronized (future) {
1165                 if (future.value == expect) {
1166                     future.value = update;
1167                     return true;
1168                 }
1169                 return false;
1170             }
1171         }
1172     }
1173 
cancellationExceptionWithCause( @ullable String message, @Nullable Throwable cause)1174     private static CancellationException cancellationExceptionWithCause(
1175             @Nullable String message, @Nullable Throwable cause) {
1176         CancellationException exception = new CancellationException(message);
1177         exception.initCause(cause);
1178         return exception;
1179     }
1180 
1181     @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor.
checkNotNull(@ullable T reference)1182     static <T> @NonNull T checkNotNull(@Nullable T reference) {
1183         if (reference == null) {
1184             throw new NullPointerException();
1185         }
1186         return reference;
1187     }
1188 }
1189