1 /* 2 * Copyright (C) 2014 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 21 import com.google.errorprone.annotations.CanIgnoreReturnValue; 22 import java.util.concurrent.CancellationException; 23 import java.util.concurrent.ExecutionException; 24 import java.util.concurrent.Executor; 25 import java.util.concurrent.TimeUnit; 26 import java.util.concurrent.TimeoutException; 27 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 28 import org.checkerframework.checker.nullness.qual.Nullable; 29 30 /** Utilities for the AbstractFutureBenchmarks */ 31 final class AbstractFutureBenchmarks { AbstractFutureBenchmarks()32 private AbstractFutureBenchmarks() {} 33 34 interface Facade<T> extends ListenableFuture<T> { 35 @CanIgnoreReturnValue set(T t)36 boolean set(T t); 37 38 @CanIgnoreReturnValue setException(Throwable t)39 boolean setException(Throwable t); 40 } 41 42 private static class NewAbstractFutureFacade<T> extends AbstractFuture<T> implements Facade<T> { 43 @CanIgnoreReturnValue 44 @Override set(T t)45 public boolean set(T t) { 46 return super.set(t); 47 } 48 49 @CanIgnoreReturnValue 50 @Override setException(Throwable t)51 public boolean setException(Throwable t) { 52 return super.setException(t); 53 } 54 } 55 56 private static class OldAbstractFutureFacade<T> extends OldAbstractFuture<T> 57 implements Facade<T> { 58 @CanIgnoreReturnValue 59 @Override set(T t)60 public boolean set(T t) { 61 return super.set(t); 62 } 63 64 @CanIgnoreReturnValue 65 @Override setException(Throwable t)66 public boolean setException(Throwable t) { 67 return super.setException(t); 68 } 69 } 70 71 enum Impl { 72 NEW { 73 @Override newFacade()74 <T> Facade<T> newFacade() { 75 return new NewAbstractFutureFacade<T>(); 76 } 77 }, 78 OLD { 79 @Override newFacade()80 <T> Facade<T> newFacade() { 81 return new OldAbstractFutureFacade<T>(); 82 } 83 }; 84 newFacade()85 abstract <T> Facade<T> newFacade(); 86 } 87 awaitWaiting(Thread t)88 static void awaitWaiting(Thread t) { 89 while (true) { 90 Thread.State state = t.getState(); 91 switch (state) { 92 case RUNNABLE: 93 case BLOCKED: 94 Thread.yield(); 95 break; 96 case WAITING: 97 return; 98 default: 99 throw new AssertionError("unexpected state: " + state); 100 } 101 } 102 } 103 104 abstract static class OldAbstractFuture<V> implements ListenableFuture<V> { 105 106 /** Synchronization control for AbstractFutures. */ 107 private final Sync<V> sync = new Sync<V>(); 108 109 // The execution list to hold our executors. 110 private final ExecutionList executionList = new ExecutionList(); 111 112 /** Constructor for use by subclasses. */ OldAbstractFuture()113 protected OldAbstractFuture() {} 114 115 /* 116 * Improve the documentation of when InterruptedException is thrown. Our 117 * behavior matches the JDK's, but the JDK's documentation is misleading. 118 */ 119 /** 120 * {@inheritDoc} 121 * 122 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if 123 * the current thread is interrupted before or during the call, even if the value is already 124 * available. 125 * 126 * @throws InterruptedException if the current thread was interrupted before or during the call 127 * (optional but recommended). 128 * @throws CancellationException {@inheritDoc} 129 */ 130 @CanIgnoreReturnValue 131 @Override get(long timeout, TimeUnit unit)132 public V get(long timeout, TimeUnit unit) 133 throws InterruptedException, TimeoutException, ExecutionException { 134 return sync.get(unit.toNanos(timeout)); 135 } 136 137 /* 138 * Improve the documentation of when InterruptedException is thrown. Our 139 * behavior matches the JDK's, but the JDK's documentation is misleading. 140 */ 141 /** 142 * {@inheritDoc} 143 * 144 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if 145 * the current thread is interrupted before or during the call, even if the value is already 146 * available. 147 * 148 * @throws InterruptedException if the current thread was interrupted before or during the call 149 * (optional but recommended). 150 * @throws CancellationException {@inheritDoc} 151 */ 152 @CanIgnoreReturnValue 153 @Override get()154 public V get() throws InterruptedException, ExecutionException { 155 return sync.get(); 156 } 157 158 @Override isDone()159 public boolean isDone() { 160 return sync.isDone(); 161 } 162 163 @Override isCancelled()164 public boolean isCancelled() { 165 return sync.isCancelled(); 166 } 167 168 @CanIgnoreReturnValue 169 @Override cancel(boolean mayInterruptIfRunning)170 public boolean cancel(boolean mayInterruptIfRunning) { 171 if (!sync.cancel(mayInterruptIfRunning)) { 172 return false; 173 } 174 executionList.execute(); 175 if (mayInterruptIfRunning) { 176 interruptTask(); 177 } 178 return true; 179 } 180 181 /** 182 * Subclasses can override this method to implement interruption of the future's computation. 183 * The method is invoked automatically by a successful call to {@link #cancel(boolean) 184 * cancel(true)}. 185 * 186 * <p>The default implementation does nothing. 187 * 188 * @since 10.0 189 */ interruptTask()190 protected void interruptTask() {} 191 192 /** 193 * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code 194 * true}. 195 * 196 * @since 14.0 197 */ wasInterrupted()198 protected final boolean wasInterrupted() { 199 return sync.wasInterrupted(); 200 } 201 202 /** 203 * {@inheritDoc} 204 * 205 * @since 10.0 206 */ 207 @Override addListener(Runnable listener, Executor exec)208 public void addListener(Runnable listener, Executor exec) { 209 executionList.add(listener, exec); 210 } 211 212 /** 213 * Subclasses should invoke this method to set the result of the computation to {@code value}. 214 * This will set the state of the future to {@link OldAbstractFuture.Sync#COMPLETED} and invoke 215 * the listeners if the state was successfully changed. 216 * 217 * @param value the value that was the result of the task. 218 * @return true if the state was successfully changed. 219 */ 220 @CanIgnoreReturnValue set(@ullable V value)221 protected boolean set(@Nullable V value) { 222 boolean result = sync.set(value); 223 if (result) { 224 executionList.execute(); 225 } 226 return result; 227 } 228 229 /** 230 * Subclasses should invoke this method to set the result of the computation to an error, {@code 231 * throwable}. This will set the state of the future to {@link OldAbstractFuture.Sync#COMPLETED} 232 * and invoke the listeners if the state was successfully changed. 233 * 234 * @param throwable the exception that the task failed with. 235 * @return true if the state was successfully changed. 236 */ 237 @CanIgnoreReturnValue setException(Throwable throwable)238 protected boolean setException(Throwable throwable) { 239 boolean result = sync.setException(checkNotNull(throwable)); 240 if (result) { 241 executionList.execute(); 242 } 243 return result; 244 } 245 246 /** 247 * Following the contract of {@link AbstractQueuedSynchronizer} we create a private subclass to 248 * hold the synchronizer. This synchronizer is used to implement the blocking and waiting calls 249 * as well as to handle state changes in a thread-safe manner. The current state of the future 250 * is held in the Sync state, and the lock is released whenever the state changes to {@link 251 * #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED} 252 * 253 * <p>To avoid races between threads doing release and acquire, we transition to the final state 254 * in two steps. One thread will successfully CAS from RUNNING to COMPLETING, that thread will 255 * then set the result of the computation, and only then transition to COMPLETED, CANCELLED, or 256 * INTERRUPTED. 257 * 258 * <p>We don't use the integer argument passed between acquire methods so we pass around a -1 259 * everywhere. 260 */ 261 static final class Sync<V> extends AbstractQueuedSynchronizer { 262 263 private static final long serialVersionUID = 0L; 264 265 /* Valid states. */ 266 static final int RUNNING = 0; 267 static final int COMPLETING = 1; 268 static final int COMPLETED = 2; 269 static final int CANCELLED = 4; 270 static final int INTERRUPTED = 8; 271 272 private V value; 273 private Throwable exception; 274 275 /* 276 * Acquisition succeeds if the future is done, otherwise it fails. 277 */ 278 @Override tryAcquireShared(int ignored)279 protected int tryAcquireShared(int ignored) { 280 if (isDone()) { 281 return 1; 282 } 283 return -1; 284 } 285 286 /* 287 * We always allow a release to go through, this means the state has been 288 * successfully changed and the result is available. 289 */ 290 @Override tryReleaseShared(int finalState)291 protected boolean tryReleaseShared(int finalState) { 292 setState(finalState); 293 return true; 294 } 295 296 /** 297 * Blocks until the task is complete or the timeout expires. Throws a {@link TimeoutException} 298 * if the timer expires, otherwise behaves like {@link #get()}. 299 */ get(long nanos)300 V get(long nanos) 301 throws TimeoutException, CancellationException, ExecutionException, InterruptedException { 302 303 // Attempt to acquire the shared lock with a timeout. 304 if (!tryAcquireSharedNanos(-1, nanos)) { 305 throw new TimeoutException("Timeout waiting for task."); 306 } 307 308 return getValue(); 309 } 310 311 /** 312 * Blocks until {@link #complete(Object, Throwable, int)} has been successfully called. Throws 313 * a {@link CancellationException} if the task was cancelled, or a {@link ExecutionException} 314 * if the task completed with an error. 315 */ get()316 V get() throws CancellationException, ExecutionException, InterruptedException { 317 318 // Acquire the shared lock allowing interruption. 319 acquireSharedInterruptibly(-1); 320 return getValue(); 321 } 322 323 /** 324 * Implementation of the actual value retrieval. Will return the value on success, an 325 * exception on failure, a cancellation on cancellation, or an illegal state if the 326 * synchronizer is in an invalid state. 327 */ getValue()328 private V getValue() throws CancellationException, ExecutionException { 329 int state = getState(); 330 switch (state) { 331 case COMPLETED: 332 if (exception != null) { 333 throw new ExecutionException(exception); 334 } else { 335 return value; 336 } 337 338 case CANCELLED: 339 case INTERRUPTED: 340 throw cancellationExceptionWithCause("Task was cancelled.", exception); 341 342 default: 343 throw new IllegalStateException("Error, synchronizer in invalid state: " + state); 344 } 345 } 346 347 /** Checks if the state is {@link #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED}. */ isDone()348 boolean isDone() { 349 return (getState() & (COMPLETED | CANCELLED | INTERRUPTED)) != 0; 350 } 351 352 /** Checks if the state is {@link #CANCELLED} or {@link #INTERRUPTED}. */ isCancelled()353 boolean isCancelled() { 354 return (getState() & (CANCELLED | INTERRUPTED)) != 0; 355 } 356 357 /** Checks if the state is {@link #INTERRUPTED}. */ wasInterrupted()358 boolean wasInterrupted() { 359 return getState() == INTERRUPTED; 360 } 361 362 /** Transition to the COMPLETED state and set the value. */ set(@ullable V v)363 boolean set(@Nullable V v) { 364 return complete(v, null, COMPLETED); 365 } 366 367 /** Transition to the COMPLETED state and set the exception. */ setException(Throwable t)368 boolean setException(Throwable t) { 369 return complete(null, t, COMPLETED); 370 } 371 372 /** Transition to the CANCELLED or INTERRUPTED state. */ cancel(boolean interrupt)373 boolean cancel(boolean interrupt) { 374 return complete(null, null, interrupt ? INTERRUPTED : CANCELLED); 375 } 376 377 /** 378 * Implementation of completing a task. Either {@code v} or {@code t} will be set but not 379 * both. The {@code finalState} is the state to change to from {@link #RUNNING}. If the state 380 * is not in the RUNNING state we return {@code false} after waiting for the state to be set 381 * to a valid final state ({@link #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED}). 382 * 383 * @param v the value to set as the result of the computation. 384 * @param t the exception to set as the result of the computation. 385 * @param finalState the state to transition to. 386 */ complete(@ullable V v, @Nullable Throwable t, int finalState)387 private boolean complete(@Nullable V v, @Nullable Throwable t, int finalState) { 388 boolean doCompletion = compareAndSetState(RUNNING, COMPLETING); 389 if (doCompletion) { 390 // If this thread successfully transitioned to COMPLETING, set the value 391 // and exception and then release to the final state. 392 this.value = v; 393 // Don't actually construct a CancellationException until necessary. 394 this.exception = 395 ((finalState & (CANCELLED | INTERRUPTED)) != 0) 396 ? new CancellationException("Future.cancel() was called.") 397 : t; 398 releaseShared(finalState); 399 } else if (getState() == COMPLETING) { 400 // If some other thread is currently completing the future, block until 401 // they are done so we can guarantee completion. 402 acquireShared(-1); 403 } 404 return doCompletion; 405 } 406 } 407 cancellationExceptionWithCause( @ullable String message, @Nullable Throwable cause)408 static final CancellationException cancellationExceptionWithCause( 409 @Nullable String message, @Nullable Throwable cause) { 410 CancellationException exception = new CancellationException(message); 411 exception.initCause(cause); 412 return exception; 413 } 414 } 415 } 416