1 /* 2 * Copyright 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package androidx.work.impl.utils.futures; 18 19 import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; 20 21 import androidx.annotation.RestrictTo; 22 23 import com.google.common.util.concurrent.ListenableFuture; 24 25 import org.jspecify.annotations.NonNull; 26 import org.jspecify.annotations.Nullable; 27 28 import java.util.Locale; 29 import java.util.concurrent.CancellationException; 30 import java.util.concurrent.ExecutionException; 31 import java.util.concurrent.Executor; 32 import java.util.concurrent.Future; 33 import java.util.concurrent.ScheduledFuture; 34 import java.util.concurrent.TimeUnit; 35 import java.util.concurrent.TimeoutException; 36 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; 37 import java.util.concurrent.locks.LockSupport; 38 import java.util.logging.Level; 39 import java.util.logging.Logger; 40 41 /** 42 * Cloned from concurrent-futures package to avoid AndroidX namespace issues since there is no 43 * supportlib 28.* equivalent of this class. 44 * 45 * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More 46 * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture}, 47 * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an 48 * existing one, typically using methods like {@link Futures#transform(ListenableFuture, 49 * com.google.common.base.Function, Executor) Futures.transform} and {@link 50 * Futures#catching(ListenableFuture, Class, com.google.common.base.Function, 51 * Executor) Futures.catching}. 52 * 53 * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way 54 * to set the result of the computation through the protected methods {@link #set(Object)}, {@link 55 * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override 56 * {@link #afterDone()}, which will be invoked automatically when the future completes. Subclasses 57 * should rarely override other methods. 58 * 59 * @author Sven Mawson 60 * @author Luke Sandberg 61 * @since 1.0 62 */ 63 @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) 64 @SuppressWarnings("ShortCircuitBoolean") // we use non-short circuiting comparisons intentionally 65 public abstract class AbstractFuture<V> implements ListenableFuture<V> { 66 67 // NOTE: Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, || 68 69 @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. 70 static final boolean GENERATE_CANCELLATION_CAUSES = 71 Boolean.parseBoolean( 72 System.getProperty("guava.concurrent.generate_cancellation_cause", "false")); 73 74 // Logger to log exceptions caught when running listeners. 75 private static final Logger log = Logger.getLogger(AbstractFuture.class.getName()); 76 77 // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of 78 // blocking. This value is what AbstractQueuedSynchronizer uses. 79 private static final long SPIN_THRESHOLD_NANOS = 1000L; 80 81 @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. 82 static final AtomicHelper ATOMIC_HELPER; 83 84 static { 85 AtomicHelper helper; 86 Throwable thrownAtomicReferenceFieldUpdaterFailure = null; 87 88 // The access control checks that ARFU does means the caller class has to be 89 // AbstractFuture instead of SafeAtomicHelper, so we annoyingly define these here 90 try { 91 helper = 92 new SafeAtomicHelper( 93 newUpdater(Waiter.class, Thread.class, "thread"), 94 newUpdater(Waiter.class, Waiter.class, "next"), 95 newUpdater(AbstractFuture.class, Waiter.class, "waiters"), 96 newUpdater(AbstractFuture.class, Listener.class, "listeners"), 97 newUpdater(AbstractFuture.class, Object.class, "value")); 98 } catch (Throwable atomicReferenceFieldUpdaterFailure) { 99 // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause 100 // getDeclaredField to throw a NoSuchFieldException when the field is definitely 101 // there. For these users fallback to a suboptimal implementation, 102 // based on synchronized. This will be a definite performance hit to those users. 103 thrownAtomicReferenceFieldUpdaterFailure = atomicReferenceFieldUpdaterFailure; 104 helper = new SynchronizedHelper(); 105 } 106 107 ATOMIC_HELPER = helper; 108 109 // Prevent rare disastrous classloading in first call to LockSupport.park. 110 // See: https://bugs.openjdk.java.net/browse/JDK-8074773 111 @SuppressWarnings("unused") 112 Class<?> ensureLoaded = LockSupport.class; 113 114 // Log after all static init is finished; if an installed logger uses any Futures 115 // methods, it shouldn't break in cases where reflection is missing/broken. 116 if (thrownAtomicReferenceFieldUpdaterFailure != null) { log.log(Level.SEVERE, "SafeAtomicHelper is broken!", thrownAtomicReferenceFieldUpdaterFailure)117 log.log(Level.SEVERE, "SafeAtomicHelper is broken!", 118 thrownAtomicReferenceFieldUpdaterFailure); 119 } 120 } 121 122 /** Waiter links form a Treiber stack, in the {@link #waiters} field. */ 123 private static final class Waiter { 124 static final Waiter TOMBSTONE = new Waiter(false /* ignored param */); 125 126 volatile @Nullable Thread thread; 127 volatile @Nullable Waiter next; 128 129 /** 130 * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded 131 * before the ATOMIC_HELPER. Apparently this is possible on some android platforms. 132 */ Waiter(boolean unused)133 Waiter(boolean unused) { 134 } 135 Waiter()136 Waiter() { 137 // avoid volatile write, write is made visible by subsequent CAS on waiters field 138 ATOMIC_HELPER.putThread(this, Thread.currentThread()); 139 } 140 141 // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters 142 // field. setNext(Waiter next)143 void setNext(Waiter next) { 144 ATOMIC_HELPER.putNext(this, next); 145 } 146 unpark()147 void unpark() { 148 // This is racy with removeWaiter. The consequence of the race is that we may 149 // spuriously call unpark even though the thread has already removed itself 150 // from the list. But even if we did use a CAS, that race would still exist 151 // (it would just be ever so slightly smaller). 152 Thread w = thread; 153 if (w != null) { 154 thread = null; 155 LockSupport.unpark(w); 156 } 157 } 158 } 159 160 /** 161 * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted 162 * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are 163 * saved by two things. 164 * 165 * <ul> 166 * <li>This is only called when a waiting thread times out or is interrupted. Both of which 167 * should be rare. 168 * <li>The waiters list should be very short. 169 * </ul> 170 */ removeWaiter(Waiter node)171 private void removeWaiter(Waiter node) { 172 node.thread = null; // mark as 'deleted' 173 restart: 174 while (true) { 175 Waiter pred = null; 176 Waiter curr = waiters; 177 if (curr == Waiter.TOMBSTONE) { 178 return; // give up if someone is calling complete 179 } 180 Waiter succ; 181 while (curr != null) { 182 succ = curr.next; 183 if (curr.thread != null) { // we aren't unlinking this node, update pred. 184 pred = curr; 185 } else if (pred != null) { // We are unlinking this node and it has a predecessor. 186 pred.next = succ; 187 if (pred.thread == null) { 188 // We raced with another node that unlinked pred. Restart. 189 continue restart; 190 } 191 } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { // We are unlinking head 192 continue restart; // We raced with an add or complete 193 } 194 curr = succ; 195 } 196 break; 197 } 198 } 199 200 /** Listeners also form a stack through the {@link #listeners} field. */ 201 private static final class Listener { 202 static final Listener TOMBSTONE = new Listener(null, null); 203 final Runnable task; 204 final Executor executor; 205 206 // writes to next are made visible by subsequent CAS's on the listeners field 207 @Nullable Listener next; 208 Listener(Runnable task, Executor executor)209 Listener(Runnable task, Executor executor) { 210 this.task = task; 211 this.executor = executor; 212 } 213 } 214 215 /** A special value to represent {@code null}. */ 216 private static final Object NULL = new Object(); 217 218 /** A special value to represent failure, when {@link #setException} is called successfully. */ 219 private static final class Failure { 220 static final Failure FALLBACK_INSTANCE = 221 new Failure( 222 new Throwable("Failure occurred while trying to finish a future.") { 223 @Override 224 public synchronized Throwable fillInStackTrace() { 225 return this; // no stack trace 226 } 227 }); 228 final Throwable exception; 229 Failure(Throwable exception)230 Failure(Throwable exception) { 231 this.exception = checkNotNull(exception); 232 } 233 } 234 235 /** A special value to represent cancellation and the 'wasInterrupted' bit. */ 236 private static final class Cancellation { 237 // constants to use when GENERATE_CANCELLATION_CAUSES = false 238 static final Cancellation CAUSELESS_INTERRUPTED; 239 static final Cancellation CAUSELESS_CANCELLED; 240 241 static { 242 if (GENERATE_CANCELLATION_CAUSES) { 243 CAUSELESS_CANCELLED = null; 244 CAUSELESS_INTERRUPTED = null; 245 } else { 246 CAUSELESS_CANCELLED = new Cancellation(false, null); 247 CAUSELESS_INTERRUPTED = new Cancellation(true, null); 248 } 249 } 250 251 final boolean wasInterrupted; 252 final @Nullable Throwable cause; 253 Cancellation(boolean wasInterrupted, @Nullable Throwable cause)254 Cancellation(boolean wasInterrupted, @Nullable Throwable cause) { 255 this.wasInterrupted = wasInterrupted; 256 this.cause = cause; 257 } 258 } 259 260 /** A special value that encodes the 'setFuture' state. */ 261 private static final class SetFuture<V> implements Runnable { 262 final AbstractFuture<V> owner; 263 final ListenableFuture<? extends V> future; 264 SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future)265 SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) { 266 this.owner = owner; 267 this.future = future; 268 } 269 270 @Override run()271 public void run() { 272 if (owner.value != this) { 273 // nothing to do, we must have been cancelled, don't bother inspecting the future. 274 return; 275 } 276 Object valueToSet = getFutureValue(future); 277 if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) { 278 complete(owner); 279 } 280 } 281 } 282 283 // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is 284 // available. 285 /** 286 * This field encodes the current state of the future. 287 * 288 * <p>The valid values are: 289 * 290 * <ul> 291 * <li>{@code null} initial state, nothing has happened. 292 * <li>{@link Cancellation} terminal state, {@code cancel} was called. 293 * <li>{@link Failure} terminal state, {@code setException} was called. 294 * <li>{@link SetFuture} intermediate state, {@code setFuture} was called. 295 * <li>{@link #NULL} terminal state, {@code set(null)} was called. 296 * <li>Any other non-null value, terminal state, {@code set} was called with a non-null 297 * argument. 298 * </ul> 299 */ 300 @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. 301 volatile @Nullable Object value; 302 303 /** All listeners. */ 304 @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. 305 volatile @Nullable Listener listeners; 306 307 /** All waiting threads. */ 308 @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. 309 volatile @Nullable Waiter waiters; 310 311 /** Constructor for use by subclasses. */ AbstractFuture()312 protected AbstractFuture() { 313 } 314 315 // Gets and Timed Gets 316 // 317 // * Be responsive to interruption 318 // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the 319 // waiters field. 320 // * Future completion is defined by when #value becomes non-null/non SetFuture 321 // * Future completion can be observed if the waiters field contains a TOMBSTONE 322 323 // Timed Get 324 // There are a few design constraints to consider 325 // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I 326 // have observed 12 micros on 64 bit linux systems to wake up a parked thread). So if the 327 // timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of 328 // spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for 329 // similar purposes. 330 // * We want to behave reasonably for timeouts of 0 331 // * We are more responsive to completion than timeouts. This is because parkNanos depends on 332 // system scheduling and as such we could either miss our deadline, or unpark() could be 333 // delayed so that it looks like we timed out even though we didn't. For comparison FutureTask 334 // respects completion preferably and AQS is non-deterministic (depends on where in the queue 335 // the waiter is). If we wanted to be strict about it, we could store the unpark() time in 336 // the Waiter node and we could use that to make a decision about whether or not we timed out 337 // prior to being unparked. 338 339 /** 340 * {@inheritDoc} 341 * 342 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} 343 * if the current thread is interrupted during the call, even if the value is already available. 344 * 345 * @throws CancellationException {@inheritDoc} 346 */ 347 @Override get(long timeout, TimeUnit unit)348 public final V get(long timeout, TimeUnit unit) 349 throws InterruptedException, TimeoutException, ExecutionException { 350 // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) 351 // loop at the bottom and throw a timeoutexception. 352 // we rely on the implicit null check on unit. 353 final long timeoutNanos = unit.toNanos(timeout); 354 long remainingNanos = timeoutNanos; 355 if (Thread.interrupted()) { 356 throw new InterruptedException(); 357 } 358 Object localValue = value; 359 if (localValue != null & !(localValue instanceof SetFuture)) { 360 return getDoneValue(localValue); 361 } 362 // we delay calling nanoTime until we know we will need to either park or spin 363 final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0; 364 long_wait_loop: 365 if (remainingNanos >= SPIN_THRESHOLD_NANOS) { 366 Waiter oldHead = waiters; 367 if (oldHead != Waiter.TOMBSTONE) { 368 Waiter node = new Waiter(); 369 do { 370 node.setNext(oldHead); 371 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 372 while (true) { 373 LockSupport.parkNanos(this, remainingNanos); 374 // Check interruption first, if we woke up due to interruption we 375 // need to honor that. 376 if (Thread.interrupted()) { 377 removeWaiter(node); 378 throw new InterruptedException(); 379 } 380 381 // Otherwise re-read and check doneness. If we loop then it must have 382 // been a spurious wakeup 383 localValue = value; 384 if (localValue != null & !(localValue instanceof SetFuture)) { 385 return getDoneValue(localValue); 386 } 387 388 // timed out? 389 remainingNanos = endNanos - System.nanoTime(); 390 if (remainingNanos < SPIN_THRESHOLD_NANOS) { 391 // Remove the waiter, one way or another we are done parking this 392 // thread. 393 removeWaiter(node); 394 break long_wait_loop; // jump down to the busy wait loop 395 } 396 } 397 } 398 oldHead = waiters; // re-read and loop. 399 } while (oldHead != Waiter.TOMBSTONE); 400 } 401 // re-read value, if we get here then we must have observed a TOMBSTONE while trying 402 // to add a waiter. 403 return getDoneValue(value); 404 } 405 // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node 406 // on the waiters list 407 while (remainingNanos > 0) { 408 localValue = value; 409 if (localValue != null & !(localValue instanceof SetFuture)) { 410 return getDoneValue(localValue); 411 } 412 if (Thread.interrupted()) { 413 throw new InterruptedException(); 414 } 415 remainingNanos = endNanos - System.nanoTime(); 416 } 417 418 String futureToString = toString(); 419 final String unitString = unit.toString().toLowerCase(Locale.ROOT); 420 String message = "Waited " + timeout + " " + unit.toString().toLowerCase(Locale.ROOT); 421 // Only report scheduling delay if larger than our spin threshold - otherwise it's just 422 // noise 423 if (remainingNanos + SPIN_THRESHOLD_NANOS < 0) { 424 // We over-waited for our timeout. 425 message += " (plus "; 426 long overWaitNanos = -remainingNanos; 427 long overWaitUnits = unit.convert(overWaitNanos, TimeUnit.NANOSECONDS); 428 long overWaitLeftoverNanos = overWaitNanos - unit.toNanos(overWaitUnits); 429 boolean shouldShowExtraNanos = 430 overWaitUnits == 0 || overWaitLeftoverNanos > SPIN_THRESHOLD_NANOS; 431 if (overWaitUnits > 0) { 432 message += overWaitUnits + " " + unitString; 433 if (shouldShowExtraNanos) { 434 message += ","; 435 } 436 message += " "; 437 } 438 if (shouldShowExtraNanos) { 439 message += overWaitLeftoverNanos + " nanoseconds "; 440 } 441 442 message += "delay)"; 443 } 444 // It's confusing to see a completed future in a timeout message; if isDone() returns false, 445 // then we know it must have given a pending toString value earlier. If not, then the future 446 // completed after the timeout expired, and the message might be success. 447 if (isDone()) { 448 throw new TimeoutException(message + " but future completed as timeout expired"); 449 } 450 throw new TimeoutException(message + " for " + futureToString); 451 } 452 453 /** 454 * {@inheritDoc} 455 * 456 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} 457 * if the current thread is interrupted during the call, even if the value is already available. 458 * 459 * @throws CancellationException {@inheritDoc} 460 */ 461 @Override get()462 public final V get() throws InterruptedException, ExecutionException { 463 if (Thread.interrupted()) { 464 throw new InterruptedException(); 465 } 466 Object localValue = value; 467 if (localValue != null & !(localValue instanceof SetFuture)) { 468 return getDoneValue(localValue); 469 } 470 Waiter oldHead = waiters; 471 if (oldHead != Waiter.TOMBSTONE) { 472 Waiter node = new Waiter(); 473 do { 474 node.setNext(oldHead); 475 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 476 // we are on the stack, now wait for completion. 477 while (true) { 478 LockSupport.park(this); 479 // Check interruption first, if we woke up due to interruption we need to 480 // honor that. 481 if (Thread.interrupted()) { 482 removeWaiter(node); 483 throw new InterruptedException(); 484 } 485 // Otherwise re-read and check doneness. If we loop then it must have 486 // been a spurious 487 // wakeup 488 localValue = value; 489 if (localValue != null & !(localValue instanceof SetFuture)) { 490 return getDoneValue(localValue); 491 } 492 } 493 } 494 oldHead = waiters; // re-read and loop. 495 } while (oldHead != Waiter.TOMBSTONE); 496 } 497 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to 498 // add a waiter. 499 return getDoneValue(value); 500 } 501 502 /** Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. */ getDoneValue(Object obj)503 private V getDoneValue(Object obj) throws ExecutionException { 504 // While this seems like it might be too branch-y, simple benchmarking proves it to be 505 // unmeasurable (comparing done AbstractFutures with immediateFuture) 506 if (obj instanceof Cancellation) { 507 throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause); 508 } else if (obj instanceof Failure) { 509 throw new ExecutionException(((Failure) obj).exception); 510 } else if (obj == NULL) { 511 return null; 512 } else { 513 @SuppressWarnings("unchecked") // this is the only other option 514 V asV = (V) obj; 515 return asV; 516 } 517 } 518 519 @Override isDone()520 public final boolean isDone() { 521 final Object localValue = value; 522 return localValue != null & !(localValue instanceof SetFuture); 523 } 524 525 @Override isCancelled()526 public final boolean isCancelled() { 527 final Object localValue = value; 528 return localValue instanceof Cancellation; 529 } 530 531 /** 532 * {@inheritDoc} 533 * 534 * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been 535 * {@linkplain #setFuture set asynchronously}, then the cancellation will also be propagated 536 * to the delegate {@code Future} that was supplied in the {@code setFuture} call. 537 * 538 * <p>Rather than override this method to perform additional cancellation work or cleanup, 539 * subclasses should override {@link #afterDone}, consulting {@link #isCancelled} and {@link 540 * #wasInterrupted} as necessary. This ensures that the work is done even if the future is 541 * cancelled without a call to {@code cancel}, such as by calling {@code 542 * setFuture(cancelledFuture)}. 543 */ 544 @Override cancel(boolean mayInterruptIfRunning)545 public final boolean cancel(boolean mayInterruptIfRunning) { 546 Object localValue = value; 547 boolean rValue = false; 548 if (localValue == null | localValue instanceof SetFuture) { 549 // Try to delay allocating the exception. At this point we may still lose the CAS, 550 // but it is certainly less likely. 551 Object valueToSet = 552 GENERATE_CANCELLATION_CAUSES 553 ? new Cancellation( 554 mayInterruptIfRunning, 555 new CancellationException("Future.cancel() was called.")) 556 : (mayInterruptIfRunning 557 ? Cancellation.CAUSELESS_INTERRUPTED 558 : Cancellation.CAUSELESS_CANCELLED); 559 AbstractFuture<?> abstractFuture = this; 560 while (true) { 561 if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) { 562 rValue = true; 563 // We call interuptTask before calling complete(), which is consistent with 564 // FutureTask 565 if (mayInterruptIfRunning) { 566 abstractFuture.interruptTask(); 567 } 568 complete(abstractFuture); 569 if (localValue instanceof SetFuture) { 570 // propagate cancellation to the future set in setfuture, this is racy, 571 // and we don't 572 // care if we are successful or not. 573 ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue).future; 574 if (futureToPropagateTo instanceof AbstractFuture) { 575 // If the future is a trusted then we specifically avoid 576 // calling cancel() this has 2 benefits 577 // 1. for long chains of futures strung together with setFuture we 578 // consume less stack 579 // 2. we avoid allocating Cancellation objects at every level of the 580 // cancellation chain 581 // We can only do this for TrustedFuture, because TrustedFuture 582 // .cancel is final and does nothing but delegate to this method. 583 AbstractFuture<?> 584 trusted = (AbstractFuture<?>) futureToPropagateTo; 585 localValue = trusted.value; 586 if (localValue == null | localValue instanceof SetFuture) { 587 abstractFuture = trusted; 588 continue; // loop back up and try to complete the new future 589 } 590 } else { 591 // not a TrustedFuture, call cancel directly. 592 futureToPropagateTo.cancel(mayInterruptIfRunning); 593 } 594 } 595 break; 596 } 597 // obj changed, reread 598 localValue = abstractFuture.value; 599 if (!(localValue instanceof SetFuture)) { 600 // obj cannot be null at this point, because value can only change from null 601 // to non-null. So if value changed (and it did since we lost the CAS), 602 // then it cannot be null and since it isn't a SetFuture, then the future must 603 // be done and we should exit the loop 604 break; 605 } 606 } 607 } 608 return rValue; 609 } 610 611 /** 612 * Subclasses can override this method to implement interruption of the future's computation. 613 * The method is invoked automatically by a successful call to 614 * {@link #cancel(boolean) cancel(true)}. 615 * 616 * <p>The default implementation does nothing. 617 * 618 * <p>This method is likely to be deprecated. Prefer to override {@link #afterDone}, checking 619 * {@link #wasInterrupted} to decide whether to interrupt your task. 620 * 621 * @since 10.0 622 */ interruptTask()623 protected void interruptTask() { 624 } 625 626 /** 627 * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code 628 * true}. 629 * 630 * @since 14.0 631 */ wasInterrupted()632 protected final boolean wasInterrupted() { 633 final Object localValue = value; 634 return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; 635 } 636 637 /** 638 * {@inheritDoc} 639 * 640 * @since 10.0 641 */ 642 @Override addListener(Runnable listener, Executor executor)643 public final void addListener(Runnable listener, Executor executor) { 644 checkNotNull(listener); 645 checkNotNull(executor); 646 Listener oldHead = listeners; 647 if (oldHead != Listener.TOMBSTONE) { 648 Listener newNode = new Listener(listener, executor); 649 do { 650 newNode.next = oldHead; 651 if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) { 652 return; 653 } 654 oldHead = listeners; // re-read 655 } while (oldHead != Listener.TOMBSTONE); 656 } 657 // If we get here then the Listener TOMBSTONE was set, which means the future is done, call 658 // the listener. 659 executeListener(listener, executor); 660 } 661 662 /** 663 * Sets the result of this {@code Future} unless this {@code Future} has already been 664 * cancelled or set (including {@linkplain #setFuture set asynchronously}). 665 * When a call to this method returns, the {@code Future} is guaranteed to be 666 * {@linkplain #isDone done} <b>only if</b> the call was accepted (in which case it returns 667 * {@code true}). If it returns {@code false}, the {@code Future} may have previously been set 668 * asynchronously, in which case its result may not be known yet. That result, 669 * though not yet known, cannot be overridden by a call to a {@code set*} method, 670 * only by a call to {@link #cancel}. 671 * 672 * @param value the value to be used as the result 673 * @return true if the attempt was accepted, completing the {@code Future} 674 */ set(@ullable V value)675 protected boolean set(@Nullable V value) { 676 Object valueToSet = value == null ? NULL : value; 677 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 678 complete(this); 679 return true; 680 } 681 return false; 682 } 683 684 /** 685 * Sets the failed result of this {@code Future} unless this {@code Future} has already been 686 * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this 687 * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only 688 * if</b> 689 * the call was accepted (in which case it returns {@code true}). If it returns {@code 690 * false}, the 691 * {@code Future} may have previously been set asynchronously, in which case its result may 692 * not be 693 * known yet. That result, though not yet known, cannot be overridden by a call to a {@code 694 * set*} 695 * method, only by a call to {@link #cancel}. 696 * 697 * @param throwable the exception to be used as the failed result 698 * @return true if the attempt was accepted, completing the {@code Future} 699 */ setException(Throwable throwable)700 protected boolean setException(Throwable throwable) { 701 Object valueToSet = new Failure(checkNotNull(throwable)); 702 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 703 complete(this); 704 return true; 705 } 706 return false; 707 } 708 709 /** 710 * Sets the result of this {@code Future} to match the supplied input {@code Future} once the 711 * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set 712 * (including "set asynchronously," defined below). 713 * 714 * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the 715 * call is accepted, then this future is guaranteed to have been completed with the supplied 716 * future by the time this method returns. If the supplied future is not done and the call 717 * is accepted, then the future will be <i>set asynchronously</i>. Note that such a result, 718 * though not yet known, cannot be overridden by a call to a {@code set*} method, 719 * only by a call to {@link #cancel}. 720 * 721 * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later 722 * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to 723 * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code 724 * Future}. 725 * 726 * <p>Note that, even if the supplied future is cancelled and it causes this future to complete, 727 * it will never trigger interruption behavior. In particular, it will not cause this future to 728 * invoke the {@link #interruptTask} method, and the {@link #wasInterrupted} method will not 729 * return {@code true}. 730 * 731 * @param future the future to delegate to 732 * @return true if the attempt was accepted, indicating that the {@code Future} was not 733 * previously cancelled or set. 734 * @since 19.0 735 */ setFuture(ListenableFuture<? extends V> future)736 protected boolean setFuture(ListenableFuture<? extends V> future) { 737 checkNotNull(future); 738 Object localValue = value; 739 if (localValue == null) { 740 if (future.isDone()) { 741 Object value = getFutureValue(future); 742 if (ATOMIC_HELPER.casValue(this, null, value)) { 743 complete(this); 744 return true; 745 } 746 return false; 747 } 748 SetFuture valueToSet = new SetFuture<V>(this, future); 749 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 750 // the listener is responsible for calling completeWithFuture, directExecutor is 751 // appropriate since all we are doing is unpacking a completed future 752 // which should be fast. 753 try { 754 future.addListener(valueToSet, DirectExecutor.INSTANCE); 755 } catch (Throwable t) { 756 // addListener has thrown an exception! SetFuture.run can't throw any 757 // exceptions so this must have been caused by addListener itself. 758 // The most likely explanation is a misconfigured mock. 759 // Try to switch to Failure. 760 Failure failure; 761 try { 762 failure = new Failure(t); 763 } catch (Throwable oomMostLikely) { 764 failure = Failure.FALLBACK_INSTANCE; 765 } 766 // Note: The only way this CAS could fail is if cancel() has raced with us. 767 // That is ok. 768 boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure); 769 } 770 return true; 771 } 772 localValue = value; // we lost the cas, fall through and maybe cancel 773 } 774 // The future has already been set to something. If it is cancellation we should cancel the 775 // incoming future. 776 if (localValue instanceof Cancellation) { 777 // we don't care if it fails, this is best-effort. 778 future.cancel(((Cancellation) localValue).wasInterrupted); 779 } 780 return false; 781 } 782 783 /** 784 * Returns a value that satisfies the contract of the {@link #value} field based on the state of 785 * given future. 786 * 787 * <p>This is approximately the inverse of {@link #getDoneValue(Object)} 788 */ 789 @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. getFutureValue(ListenableFuture<?> future)790 static Object getFutureValue(ListenableFuture<?> future) { 791 if (future instanceof AbstractFuture) { 792 // Break encapsulation for TrustedFuture instances since we know that subclasses cannot 793 // override .get() (since it is final) and therefore this is equivalent to calling 794 // .get() and unpacking the exceptions like we do below (just much faster because it is 795 // a single field read instead of a read, several branches and possibly 796 // creating exceptions). 797 Object v = ((AbstractFuture<?>) future).value; 798 if (v instanceof Cancellation) { 799 // If the other future was interrupted, clear the interrupted bit while 800 // preserving the cause this will make it consistent with how non-trustedfutures 801 // work which cannot propagate the wasInterrupted bit 802 Cancellation c = (Cancellation) v; 803 if (c.wasInterrupted) { 804 v = c.cause != null ? new Cancellation(/* wasInterrupted= */ false, c.cause) 805 : Cancellation.CAUSELESS_CANCELLED; 806 } 807 } 808 return v; 809 } 810 boolean wasCancelled = future.isCancelled(); 811 // Don't allocate a CancellationException if it's not necessary 812 if (!GENERATE_CANCELLATION_CAUSES & wasCancelled) { 813 return Cancellation.CAUSELESS_CANCELLED; 814 } 815 // Otherwise calculate the value by calling .get() 816 try { 817 Object v = getUninterruptibly(future); 818 return v == null ? NULL : v; 819 } catch (ExecutionException exception) { 820 return new Failure(exception.getCause()); 821 } catch (CancellationException cancellation) { 822 if (!wasCancelled) { 823 return new Failure( 824 new IllegalArgumentException( 825 "get() threw CancellationException, despite reporting isCancelled" 826 + "() == false: " 827 + future, 828 cancellation)); 829 } 830 return new Cancellation(false, cancellation); 831 } catch (Throwable t) { 832 return new Failure(t); 833 } 834 } 835 836 /** 837 * internal dependency on other /util/concurrent classes. 838 */ getUninterruptibly(Future<V> future)839 private static <V> V getUninterruptibly(Future<V> future) throws ExecutionException { 840 boolean interrupted = false; 841 try { 842 while (true) { 843 try { 844 return future.get(); 845 } catch (InterruptedException e) { 846 interrupted = true; 847 } 848 } 849 } finally { 850 if (interrupted) { 851 Thread.currentThread().interrupt(); 852 } 853 } 854 } 855 856 /** Unblocks all threads and runs all listeners. */ 857 @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. complete(AbstractFuture<?> future)858 static void complete(AbstractFuture<?> future) { 859 Listener next = null; 860 outer: 861 while (true) { 862 future.releaseWaiters(); 863 // We call this before the listeners in order to avoid needing to manage a separate 864 // stack data structure for them. Also, some implementations rely on this running 865 // prior to listeners so that the cleanup work is visible to listeners. 866 // afterDone() should be generally fast and only used for cleanup work... but in 867 // theory can also be recursive and create StackOverflowErrors 868 future.afterDone(); 869 // push the current set of listeners onto next 870 next = future.clearListeners(next); 871 future = null; 872 while (next != null) { 873 Listener curr = next; 874 next = next.next; 875 Runnable task = curr.task; 876 if (task instanceof SetFuture) { 877 SetFuture<?> setFuture = (SetFuture<?>) task; 878 // We unwind setFuture specifically to avoid StackOverflowErrors in the case 879 // of long chains of SetFutures 880 // Handling this special case is important because there is no way to pass an 881 // executor to setFuture, so a user couldn't break the chain by doing this 882 // themselves. It is also potentially common if someone writes a recursive 883 // Futures.transformAsync transformer. 884 future = setFuture.owner; 885 if (future.value == setFuture) { 886 Object valueToSet = getFutureValue(setFuture.future); 887 if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) { 888 continue outer; 889 } 890 } 891 // other wise the future we were trying to set is already done. 892 } else { 893 executeListener(task, curr.executor); 894 } 895 } 896 break; 897 } 898 } 899 900 /** 901 * Callback method that is called exactly once after the future is completed. 902 * 903 * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it. 904 * 905 * <p>The default implementation of this method in {@code AbstractFuture} does nothing. This is 906 * intended for very lightweight cleanup work, for example, timing statistics or clearing 907 * fields. 908 * If your task does anything heavier consider, just using a listener with an executor. 909 * 910 * @since 20.0 911 */ afterDone()912 protected void afterDone() { 913 } 914 915 /** 916 * If this future has been cancelled (and possibly interrupted), cancels (and possibly 917 * interrupts) the given future (if available). 918 */ 919 @SuppressWarnings("ParameterNotNullable") maybePropagateCancellationTo(@ullable Future<?> related)920 final void maybePropagateCancellationTo(@Nullable Future<?> related) { 921 if (related != null & isCancelled()) { 922 related.cancel(wasInterrupted()); 923 } 924 } 925 926 /** Releases all threads in the {@link #waiters} list, and clears the list. */ releaseWaiters()927 private void releaseWaiters() { 928 Waiter head; 929 do { 930 head = waiters; 931 } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE)); 932 for (Waiter currentWaiter = head; currentWaiter != null; 933 currentWaiter = currentWaiter.next) { 934 currentWaiter.unpark(); 935 } 936 } 937 938 /** 939 * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently 940 * added first. 941 */ clearListeners(Listener onto)942 private Listener clearListeners(Listener onto) { 943 // We need to 944 // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that to 945 // to synchronize with us 946 // 2. reverse the linked list, because despite our rather clear contract, people depend 947 // on us executing listeners in the order they were added 948 // 3. push all the items onto 'onto' and return the new head of the stack 949 Listener head; 950 do { 951 head = listeners; 952 } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE)); 953 Listener reversedList = onto; 954 while (head != null) { 955 Listener tmp = head; 956 head = head.next; 957 tmp.next = reversedList; 958 reversedList = tmp; 959 } 960 return reversedList; 961 } 962 963 // TODO(clm): move parts into a default method on ListenableFuture? 964 @Override toString()965 public String toString() { 966 StringBuilder builder = new StringBuilder().append(super.toString()).append("[status="); 967 if (isCancelled()) { 968 builder.append("CANCELLED"); 969 } else if (isDone()) { 970 addDoneString(builder); 971 } else { 972 String pendingDescription; 973 try { 974 pendingDescription = pendingToString(); 975 } catch (RuntimeException e) { 976 // Don't call getMessage or toString() on the exception, in case the exception 977 // thrown by the subclass is implemented with bugs similar to the subclass. 978 pendingDescription = "Exception thrown from implementation: " + e.getClass(); 979 } 980 // The future may complete during or before the call to getPendingToString, so we use 981 // null as a signal that we should try checking if the future is done again. 982 if (pendingDescription != null && !pendingDescription.isEmpty()) { 983 builder.append("PENDING, info=[").append(pendingDescription).append("]"); 984 } else if (isDone()) { 985 addDoneString(builder); 986 } else { 987 builder.append("PENDING"); 988 } 989 } 990 return builder.append("]").toString(); 991 } 992 993 /** 994 * Provide a human-readable explanation of why this future has not yet completed. 995 * 996 * @return null if an explanation cannot be provided because the future is done. 997 * @since 23.0 998 */ pendingToString()999 protected @Nullable String pendingToString() { 1000 Object localValue = value; 1001 if (localValue instanceof SetFuture) { 1002 return "setFuture=[" + userObjectToString(((SetFuture) localValue).future) + "]"; 1003 } else if (this instanceof ScheduledFuture) { 1004 return "remaining delay=[" 1005 + ((ScheduledFuture) this).getDelay(TimeUnit.MILLISECONDS) 1006 + " ms]"; 1007 } 1008 return null; 1009 } 1010 addDoneString(StringBuilder builder)1011 private void addDoneString(StringBuilder builder) { 1012 try { 1013 V value = getUninterruptibly(this); 1014 builder.append("SUCCESS, result=[").append(userObjectToString(value)).append("]"); 1015 } catch (ExecutionException e) { 1016 builder.append("FAILURE, cause=[").append(e.getCause()).append("]"); 1017 } catch (CancellationException e) { 1018 builder.append("CANCELLED"); // shouldn't be reachable 1019 } catch (RuntimeException e) { 1020 builder.append("UNKNOWN, cause=[").append(e.getClass()).append(" thrown from get()]"); 1021 } 1022 } 1023 1024 /** Helper for printing user supplied objects into our toString method. */ userObjectToString(Object o)1025 private String userObjectToString(Object o) { 1026 // This is some basic recursion detection for when people create cycles via set/setFuture 1027 // This is however only partial protection though since it only detects self loops. We 1028 // could detect arbitrary cycles using a thread local or possibly by catching 1029 // StackOverflowExceptions but this should be a good enough solution 1030 // (it is also what jdk collections do in these cases) 1031 if (o == this) { 1032 return "this future"; 1033 } 1034 return String.valueOf(o); 1035 } 1036 1037 /** 1038 * Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain 1039 * RuntimeException runtime exceptions} thrown by the executor. 1040 */ executeListener(Runnable runnable, Executor executor)1041 private static void executeListener(Runnable runnable, Executor executor) { 1042 try { 1043 executor.execute(runnable); 1044 } catch (RuntimeException e) { 1045 // Log it and keep going -- bad runnable and/or executor. Don't punish the other 1046 // runnables if we're given a bad one. We only catch RuntimeException 1047 // because we want Errors to propagate up. 1048 log.log( 1049 Level.SEVERE, 1050 "RuntimeException while executing runnable " + runnable + " with executor " 1051 + executor, 1052 e); 1053 } 1054 } 1055 1056 private abstract static class AtomicHelper { 1057 /** Non volatile write of the thread to the {@link Waiter#thread} field. */ putThread(Waiter waiter, Thread newValue)1058 abstract void putThread(Waiter waiter, Thread newValue); 1059 1060 /** Non volatile write of the waiter to the {@link Waiter#next} field. */ putNext(Waiter waiter, Waiter newValue)1061 abstract void putNext(Waiter waiter, Waiter newValue); 1062 1063 /** Performs a CAS operation on the {@link #waiters} field. */ casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update)1064 abstract boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update); 1065 1066 /** Performs a CAS operation on the {@link #listeners} field. */ casListeners(AbstractFuture<?> future, Listener expect, Listener update)1067 abstract boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update); 1068 1069 /** Performs a CAS operation on the {@link #value} field. */ casValue(AbstractFuture<?> future, Object expect, Object update)1070 abstract boolean casValue(AbstractFuture<?> future, Object expect, Object update); 1071 } 1072 1073 /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */ 1074 private static final class SafeAtomicHelper extends AtomicHelper { 1075 final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater; 1076 final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater; 1077 final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater; 1078 final AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater; 1079 final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater; 1080 SafeAtomicHelper( AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater, AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater, AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater)1081 SafeAtomicHelper( 1082 AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, 1083 AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, 1084 AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater, 1085 AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater, 1086 AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) { 1087 this.waiterThreadUpdater = waiterThreadUpdater; 1088 this.waiterNextUpdater = waiterNextUpdater; 1089 this.waitersUpdater = waitersUpdater; 1090 this.listenersUpdater = listenersUpdater; 1091 this.valueUpdater = valueUpdater; 1092 } 1093 1094 @Override putThread(Waiter waiter, Thread newValue)1095 void putThread(Waiter waiter, Thread newValue) { 1096 waiterThreadUpdater.lazySet(waiter, newValue); 1097 } 1098 1099 @Override putNext(Waiter waiter, Waiter newValue)1100 void putNext(Waiter waiter, Waiter newValue) { 1101 waiterNextUpdater.lazySet(waiter, newValue); 1102 } 1103 1104 @Override casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update)1105 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 1106 return waitersUpdater.compareAndSet(future, expect, update); 1107 } 1108 1109 @Override casListeners(AbstractFuture<?> future, Listener expect, Listener update)1110 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 1111 return listenersUpdater.compareAndSet(future, expect, update); 1112 } 1113 1114 @Override casValue(AbstractFuture<?> future, Object expect, Object update)1115 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 1116 return valueUpdater.compareAndSet(future, expect, update); 1117 } 1118 } 1119 1120 /** 1121 * {@link AtomicHelper} based on {@code synchronized} and volatile writes. 1122 * 1123 * <p>This is an implementation of last resort for when certain basic VM features are broken 1124 * (like AtomicReferenceFieldUpdater). 1125 */ 1126 private static final class SynchronizedHelper extends AtomicHelper { SynchronizedHelper()1127 SynchronizedHelper() { 1128 } 1129 1130 @Override putThread(Waiter waiter, Thread newValue)1131 void putThread(Waiter waiter, Thread newValue) { 1132 waiter.thread = newValue; 1133 } 1134 1135 @Override putNext(Waiter waiter, Waiter newValue)1136 void putNext(Waiter waiter, Waiter newValue) { 1137 waiter.next = newValue; 1138 } 1139 1140 @Override casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update)1141 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 1142 synchronized (future) { 1143 if (future.waiters == expect) { 1144 future.waiters = update; 1145 return true; 1146 } 1147 return false; 1148 } 1149 } 1150 1151 @Override casListeners(AbstractFuture<?> future, Listener expect, Listener update)1152 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 1153 synchronized (future) { 1154 if (future.listeners == expect) { 1155 future.listeners = update; 1156 return true; 1157 } 1158 return false; 1159 } 1160 } 1161 1162 @Override casValue(AbstractFuture<?> future, Object expect, Object update)1163 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 1164 synchronized (future) { 1165 if (future.value == expect) { 1166 future.value = update; 1167 return true; 1168 } 1169 return false; 1170 } 1171 } 1172 } 1173 cancellationExceptionWithCause( @ullable String message, @Nullable Throwable cause)1174 private static CancellationException cancellationExceptionWithCause( 1175 @Nullable String message, @Nullable Throwable cause) { 1176 CancellationException exception = new CancellationException(message); 1177 exception.initCause(cause); 1178 return exception; 1179 } 1180 1181 @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. checkNotNull(@ullable T reference)1182 static <T> @NonNull T checkNotNull(@Nullable T reference) { 1183 if (reference == null) { 1184 throw new NullPointerException(); 1185 } 1186 return reference; 1187 } 1188 } 1189