1 /* 2 * Copyright (C) 2021 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package android.car.util.concurrent; 18 19 import static com.android.car.internal.ExcludeFromCodeCoverageGeneratedReport.BOILERPLATE_CODE; 20 21 import android.annotation.CallSuper; 22 import android.annotation.NonNull; 23 import android.annotation.Nullable; 24 import android.car.annotation.AddedInOrBefore; 25 import android.os.Handler; 26 import android.os.Looper; 27 import android.os.Parcel; 28 import android.os.Parcelable; 29 import android.os.RemoteException; 30 import android.util.Log; 31 32 import com.android.car.internal.ExcludeFromCodeCoverageGeneratedReport; 33 import com.android.internal.annotations.GuardedBy; 34 import com.android.internal.util.Preconditions; 35 36 import java.lang.reflect.Constructor; 37 import java.util.concurrent.CancellationException; 38 import java.util.concurrent.CompletableFuture; 39 import java.util.concurrent.CompletionStage; 40 import java.util.concurrent.ExecutionException; 41 import java.util.concurrent.Executor; 42 import java.util.concurrent.TimeUnit; 43 import java.util.concurrent.TimeoutException; 44 import java.util.function.BiConsumer; 45 import java.util.function.BiFunction; 46 import java.util.function.Function; 47 import java.util.function.Supplier; 48 49 /** 50 * code copied from {@code com.android.internal.infra.AndroidFuture} 51 * 52 * @param <T> see {@link CompletableFuture} 53 * 54 * @hide 55 */ 56 public class AndroidFuture<T> extends CompletableFuture<T> implements Parcelable { 57 58 private static final boolean DEBUG = false; 59 private static final String LOG_TAG = AndroidFuture.class.getSimpleName(); 60 private static final Executor DIRECT_EXECUTOR = Runnable::run; 61 private static final StackTraceElement[] EMPTY_STACK_TRACE = new StackTraceElement[0]; 62 private static @Nullable Handler sMainHandler; 63 64 private final @NonNull Object mLock = new Object(); 65 @GuardedBy("mLock") 66 private @Nullable BiConsumer<? super T, ? super Throwable> mListener; 67 @GuardedBy("mLock") 68 private @Nullable Executor mListenerExecutor = DIRECT_EXECUTOR; 69 private @NonNull Handler mTimeoutHandler = getMainHandler(); 70 private final @Nullable IAndroidFuture mRemoteOrigin; 71 AndroidFuture()72 public AndroidFuture() { 73 super(); 74 mRemoteOrigin = null; 75 } 76 AndroidFuture(Parcel in)77 AndroidFuture(Parcel in) { 78 super(); 79 if (in.readBoolean()) { 80 // Done 81 if (in.readBoolean()) { 82 // Failed 83 completeExceptionally(readThrowable(in)); 84 } else { 85 // Success 86 complete((T) in.readValue(null)); 87 } 88 mRemoteOrigin = null; 89 } else { 90 // Not done 91 mRemoteOrigin = IAndroidFuture.Stub.asInterface(in.readStrongBinder()); 92 } 93 } 94 95 @NonNull getMainHandler()96 private static Handler getMainHandler() { 97 // This isn't thread-safe but we are okay with it. 98 if (sMainHandler == null) { 99 sMainHandler = new Handler(Looper.getMainLooper()); 100 } 101 return sMainHandler; 102 } 103 104 /** 105 * Create a completed future with the given value. 106 * 107 * @param value the value for the completed future 108 * @param <U> the type of the value 109 * @return the completed future 110 */ 111 @NonNull 112 @AddedInOrBefore(majorVersion = 33) completedFuture(U value)113 public static <U> AndroidFuture<U> completedFuture(U value) { 114 AndroidFuture<U> future = new AndroidFuture<>(); 115 future.complete(value); 116 return future; 117 } 118 119 @Override 120 @AddedInOrBefore(majorVersion = 33) complete(@ullable T value)121 public boolean complete(@Nullable T value) { 122 boolean changed = super.complete(value); 123 if (changed) { 124 onCompleted(value, null); 125 } 126 return changed; 127 } 128 129 @Override 130 @AddedInOrBefore(majorVersion = 33) completeExceptionally(@onNull Throwable ex)131 public boolean completeExceptionally(@NonNull Throwable ex) { 132 boolean changed = super.completeExceptionally(ex); 133 if (changed) { 134 onCompleted(null, ex); 135 } 136 return changed; 137 } 138 139 @Override 140 @AddedInOrBefore(majorVersion = 33) cancel(boolean mayInterruptIfRunning)141 public boolean cancel(boolean mayInterruptIfRunning) { 142 boolean changed = super.cancel(mayInterruptIfRunning); 143 if (changed) { 144 try { 145 get(); 146 throw new IllegalStateException("Expected CancellationException"); 147 } catch (CancellationException ex) { 148 onCompleted(null, ex); 149 } catch (Throwable e) { 150 throw new IllegalStateException("Expected CancellationException", e); 151 } 152 } 153 return changed; 154 } 155 156 @CallSuper 157 @AddedInOrBefore(majorVersion = 33) onCompleted(@ullable T res, @Nullable Throwable err)158 protected void onCompleted(@Nullable T res, @Nullable Throwable err) { 159 cancelTimeout(); 160 161 if (DEBUG) { 162 Log.i(LOG_TAG, this + " completed with result " + (err == null ? res : err), 163 new RuntimeException()); 164 } 165 166 BiConsumer<? super T, ? super Throwable> listener; 167 synchronized (mLock) { 168 listener = mListener; 169 mListener = null; 170 } 171 172 if (listener != null) { 173 callListenerAsync(listener, res, err); 174 } 175 176 if (mRemoteOrigin != null) { 177 try { 178 mRemoteOrigin.complete(this /* resultContainer */); 179 } catch (RemoteException e) { 180 Log.e(LOG_TAG, "Failed to propagate completion", e); 181 } 182 } 183 } 184 185 @Override 186 @AddedInOrBefore(majorVersion = 33) whenComplete(@onNull BiConsumer<? super T, ? super Throwable> action)187 public AndroidFuture<T> whenComplete(@NonNull BiConsumer<? super T, ? super Throwable> action) { 188 return whenCompleteAsync(action, DIRECT_EXECUTOR); 189 } 190 191 @Override 192 @AddedInOrBefore(majorVersion = 33) whenCompleteAsync( @onNull BiConsumer<? super T, ? super Throwable> action, @NonNull Executor executor)193 public AndroidFuture<T> whenCompleteAsync( 194 @NonNull BiConsumer<? super T, ? super Throwable> action, 195 @NonNull Executor executor) { 196 Preconditions.checkNotNull(action); 197 Preconditions.checkNotNull(executor); 198 synchronized (mLock) { 199 if (!isDone()) { 200 BiConsumer<? super T, ? super Throwable> oldListener = mListener; 201 202 if (oldListener != null && executor != mListenerExecutor) { 203 // 2 listeners with different executors 204 // Too complex - give up on saving allocations and delegate to superclass 205 super.whenCompleteAsync(action, executor); 206 return this; 207 } 208 209 mListenerExecutor = executor; 210 mListener = oldListener == null 211 ? action 212 : (res, err) -> { 213 callListener(oldListener, res, err); 214 callListener(action, res, err); 215 }; 216 return this; 217 } 218 } 219 220 // isDone() == true at this point 221 T res = null; 222 Throwable err = null; 223 try { 224 res = get(); 225 } catch (ExecutionException e) { 226 err = e.getCause(); 227 } catch (Throwable e) { 228 err = e; 229 } 230 callListenerAsync(action, res, err); 231 return this; 232 } 233 callListenerAsync(BiConsumer<? super T, ? super Throwable> listener, @Nullable T res, @Nullable Throwable err)234 private void callListenerAsync(BiConsumer<? super T, ? super Throwable> listener, 235 @Nullable T res, @Nullable Throwable err) { 236 if (mListenerExecutor == DIRECT_EXECUTOR) { 237 callListener(listener, res, err); 238 } else { 239 mListenerExecutor.execute(() -> callListener(listener, res, err)); 240 } 241 } 242 243 /** 244 * Calls the provided listener, handling any exceptions that may arise. 245 */ 246 // package-private to avoid synthetic method when called from lambda callListener( @onNull BiConsumer<? super TT, ? super Throwable> listener, @Nullable TT res, @Nullable Throwable err)247 static <TT> void callListener( 248 @NonNull BiConsumer<? super TT, ? super Throwable> listener, 249 @Nullable TT res, @Nullable Throwable err) { 250 try { 251 try { 252 listener.accept(res, err); 253 } catch (Throwable t) { 254 if (err == null) { 255 // listener happy-case threw, but exception case might not throw, so report the 256 // same exception thrown by listener's happy-path to it again 257 listener.accept(null, t); 258 } else { 259 // listener exception-case threw 260 // give up on listener but preserve the original exception when throwing up 261 t.addSuppressed(err); 262 throw t; 263 } 264 } 265 } catch (Throwable t2) { 266 // give up on listener and log the result & exception to logcat 267 Log.e(LOG_TAG, "Failed to call whenComplete listener. res = " + res, t2); 268 } 269 } 270 271 /** @inheritDoc */ 272 //@Override //TODO uncomment once java 9 APIs are exposed to frameworks 273 @AddedInOrBefore(majorVersion = 33) orTimeout(long timeout, @NonNull TimeUnit unit)274 public AndroidFuture<T> orTimeout(long timeout, @NonNull TimeUnit unit) { 275 mTimeoutHandler.postDelayed(this::triggerTimeout, this, unit.toMillis(timeout)); 276 return this; 277 } 278 triggerTimeout()279 void triggerTimeout() { 280 cancelTimeout(); 281 if (!isDone()) { 282 completeExceptionally(new TimeoutException()); 283 } 284 } 285 286 /** 287 * Cancel all timeouts previously set with {@link #orTimeout}, if any. 288 * 289 * @return {@code this} for chaining 290 */ 291 @AddedInOrBefore(majorVersion = 33) cancelTimeout()292 public AndroidFuture<T> cancelTimeout() { 293 mTimeoutHandler.removeCallbacksAndMessages(this); 294 return this; 295 } 296 297 /** 298 * Specifies the handler on which timeout is to be triggered 299 */ 300 @AddedInOrBefore(majorVersion = 33) setTimeoutHandler(@onNull Handler h)301 public AndroidFuture<T> setTimeoutHandler(@NonNull Handler h) { 302 cancelTimeout(); 303 mTimeoutHandler = Preconditions.checkNotNull(h); 304 return this; 305 } 306 307 @Override 308 @AddedInOrBefore(majorVersion = 33) thenCompose( @onNull Function<? super T, ? extends CompletionStage<U>> fn)309 public <U> AndroidFuture<U> thenCompose( 310 @NonNull Function<? super T, ? extends CompletionStage<U>> fn) { 311 return thenComposeAsync(fn, DIRECT_EXECUTOR); 312 } 313 314 @Override 315 @AddedInOrBefore(majorVersion = 33) thenComposeAsync( @onNull Function<? super T, ? extends CompletionStage<U>> fn, @NonNull Executor executor)316 public <U> AndroidFuture<U> thenComposeAsync( 317 @NonNull Function<? super T, ? extends CompletionStage<U>> fn, 318 @NonNull Executor executor) { 319 return new ThenComposeAsync<>(this, fn, executor); 320 } 321 322 private static class ThenComposeAsync<T, U> extends AndroidFuture<U> 323 implements BiConsumer<Object, Throwable>, Runnable { 324 private volatile T mSourceResult = null; 325 private final Executor mExecutor; 326 private volatile Function<? super T, ? extends CompletionStage<U>> mFn; 327 ThenComposeAsync(@onNull AndroidFuture<T> source, @NonNull Function<? super T, ? extends CompletionStage<U>> fn, @NonNull Executor executor)328 ThenComposeAsync(@NonNull AndroidFuture<T> source, 329 @NonNull Function<? super T, ? extends CompletionStage<U>> fn, 330 @NonNull Executor executor) { 331 mFn = Preconditions.checkNotNull(fn); 332 mExecutor = Preconditions.checkNotNull(executor); 333 334 // subscribe to first job completion 335 source.whenComplete(this); 336 } 337 338 @Override accept(Object res, Throwable err)339 public void accept(Object res, Throwable err) { 340 if (err != null) { 341 // first or second job failed 342 completeExceptionally(err); 343 } else if (mFn != null) { 344 // first job completed 345 mSourceResult = (T) res; 346 // subscribe to second job completion asynchronously 347 mExecutor.execute(this); 348 } else { 349 // second job completed 350 complete((U) res); 351 } 352 } 353 354 @Override run()355 public void run() { 356 CompletionStage<U> secondJob; 357 try { 358 secondJob = Preconditions.checkNotNull(mFn.apply(mSourceResult)); 359 } catch (Throwable t) { 360 completeExceptionally(t); 361 return; 362 } finally { 363 // Marks first job complete 364 mFn = null; 365 } 366 // subscribe to second job completion 367 secondJob.whenComplete(this); 368 } 369 } 370 371 @Override 372 @AddedInOrBefore(majorVersion = 33) thenApply(@onNull Function<? super T, ? extends U> fn)373 public <U> AndroidFuture<U> thenApply(@NonNull Function<? super T, ? extends U> fn) { 374 return thenApplyAsync(fn, DIRECT_EXECUTOR); 375 } 376 377 @Override 378 @AddedInOrBefore(majorVersion = 33) thenApplyAsync(@onNull Function<? super T, ? extends U> fn, @NonNull Executor executor)379 public <U> AndroidFuture<U> thenApplyAsync(@NonNull Function<? super T, ? extends U> fn, 380 @NonNull Executor executor) { 381 return new ThenApplyAsync<>(this, fn, executor); 382 } 383 384 private static class ThenApplyAsync<T, U> extends AndroidFuture<U> 385 implements BiConsumer<T, Throwable>, Runnable { 386 private volatile T mSourceResult = null; 387 private final Executor mExecutor; 388 private final Function<? super T, ? extends U> mFn; 389 ThenApplyAsync(@onNull AndroidFuture<T> source, @NonNull Function<? super T, ? extends U> fn, @NonNull Executor executor)390 ThenApplyAsync(@NonNull AndroidFuture<T> source, 391 @NonNull Function<? super T, ? extends U> fn, 392 @NonNull Executor executor) { 393 mExecutor = Preconditions.checkNotNull(executor); 394 mFn = Preconditions.checkNotNull(fn); 395 396 // subscribe to job completion 397 source.whenComplete(this); 398 } 399 400 @Override accept(T res, Throwable err)401 public void accept(T res, Throwable err) { 402 if (err != null) { 403 completeExceptionally(err); 404 } else { 405 mSourceResult = res; 406 mExecutor.execute(this); 407 } 408 } 409 410 @Override run()411 public void run() { 412 try { 413 complete(mFn.apply(mSourceResult)); 414 } catch (Throwable t) { 415 completeExceptionally(t); 416 } 417 } 418 } 419 420 @Override 421 @AddedInOrBefore(majorVersion = 33) thenCombine( @onNull CompletionStage<? extends U> other, @NonNull BiFunction<? super T, ? super U, ? extends V> combineResults)422 public <U, V> AndroidFuture<V> thenCombine( 423 @NonNull CompletionStage<? extends U> other, 424 @NonNull BiFunction<? super T, ? super U, ? extends V> combineResults) { 425 return new ThenCombine<T, U, V>(this, other, combineResults); 426 } 427 428 /** @see CompletionStage#thenCombine */ 429 @AddedInOrBefore(majorVersion = 33) thenCombine(@onNull CompletionStage<Void> other)430 public AndroidFuture<T> thenCombine(@NonNull CompletionStage<Void> other) { 431 return thenCombine(other, (res, aVoid) -> res); 432 } 433 434 private static class ThenCombine<T, U, V> extends AndroidFuture<V> 435 implements BiConsumer<Object, Throwable> { 436 private volatile @Nullable T mResultT = null; 437 private volatile @NonNull CompletionStage<? extends U> mSourceU; 438 private final @NonNull BiFunction<? super T, ? super U, ? extends V> mCombineResults; 439 ThenCombine(CompletableFuture<T> sourceT, CompletionStage<? extends U> sourceU, BiFunction<? super T, ? super U, ? extends V> combineResults)440 ThenCombine(CompletableFuture<T> sourceT, 441 CompletionStage<? extends U> sourceU, 442 BiFunction<? super T, ? super U, ? extends V> combineResults) { 443 mSourceU = Preconditions.checkNotNull(sourceU); 444 mCombineResults = Preconditions.checkNotNull(combineResults); 445 446 sourceT.whenComplete(this); 447 } 448 449 @Override accept(Object res, Throwable err)450 public void accept(Object res, Throwable err) { 451 if (err != null) { 452 completeExceptionally(err); 453 return; 454 } 455 456 if (mSourceU != null) { 457 // T done 458 mResultT = (T) res; 459 460 // Subscribe to the second job completion. 461 mSourceU.whenComplete((r, e) -> { 462 // Mark the first job completion by setting mSourceU to null, so that next time 463 // the execution flow goes to the else case below. 464 mSourceU = null; 465 accept(r, e); 466 }); 467 } else { 468 // U done 469 try { 470 complete(mCombineResults.apply(mResultT, (U) res)); 471 } catch (Throwable t) { 472 completeExceptionally(t); 473 } 474 } 475 } 476 } 477 478 /** 479 * Similar to {@link CompletableFuture#supplyAsync} but 480 * runs the given action directly. 481 * 482 * The resulting future is immediately completed. 483 */ 484 @AddedInOrBefore(majorVersion = 33) supply(Supplier<T> supplier)485 public static <T> AndroidFuture<T> supply(Supplier<T> supplier) { 486 return supplyAsync(supplier, DIRECT_EXECUTOR); 487 } 488 489 /** 490 * @see CompletableFuture#supplyAsync(Supplier, Executor) 491 */ 492 @AddedInOrBefore(majorVersion = 33) supplyAsync(Supplier<T> supplier, Executor executor)493 public static <T> AndroidFuture<T> supplyAsync(Supplier<T> supplier, Executor executor) { 494 return new SupplyAsync<>(supplier, executor); 495 } 496 497 private static class SupplyAsync<T> extends AndroidFuture<T> implements Runnable { 498 private final @NonNull Supplier<T> mSupplier; 499 SupplyAsync(Supplier<T> supplier, Executor executor)500 SupplyAsync(Supplier<T> supplier, Executor executor) { 501 mSupplier = supplier; 502 executor.execute(this); 503 } 504 505 @Override run()506 public void run() { 507 try { 508 complete(mSupplier.get()); 509 } catch (Throwable t) { 510 completeExceptionally(t); 511 } 512 } 513 } 514 515 @Override 516 @AddedInOrBefore(majorVersion = 33) writeToParcel(Parcel dest, int flags)517 public void writeToParcel(Parcel dest, int flags) { 518 boolean done = isDone(); 519 dest.writeBoolean(done); 520 if (done) { 521 T result; 522 try { 523 result = get(); 524 } catch (Throwable t) { 525 dest.writeBoolean(true); 526 writeThrowable(dest, unwrapExecutionException(t)); 527 return; 528 } 529 dest.writeBoolean(false); 530 dest.writeValue(result); 531 } else { 532 dest.writeStrongBinder(new IAndroidFuture.Stub() { 533 @Override 534 public void complete(AndroidFuture resultContainer) { 535 boolean changed; 536 try { 537 changed = AndroidFuture.this.complete((T) resultContainer.get()); 538 } catch (Throwable t) { 539 changed = completeExceptionally(unwrapExecutionException(t)); 540 } 541 if (!changed) { 542 Log.w(LOG_TAG, "Remote result " + resultContainer 543 + " ignored, as local future is already completed: " 544 + AndroidFuture.this); 545 } 546 } 547 }.asBinder()); 548 } 549 } 550 551 /** 552 * Exceptions coming out of {@link #get} are wrapped in {@link ExecutionException} 553 */ unwrapExecutionException(Throwable t)554 Throwable unwrapExecutionException(Throwable t) { 555 return t instanceof ExecutionException 556 ? t.getCause() 557 : t; 558 } 559 560 /** 561 * Alternative to {@link Parcel#writeException} that stores the stack trace, in a 562 * way consistent with the binder IPC exception propagation behavior. 563 */ writeThrowable(@onNull Parcel parcel, @Nullable Throwable throwable)564 private static void writeThrowable(@NonNull Parcel parcel, @Nullable Throwable throwable) { 565 boolean hasThrowable = throwable != null; 566 parcel.writeBoolean(hasThrowable); 567 if (!hasThrowable) { 568 return; 569 } 570 571 boolean isFrameworkParcelable = throwable instanceof Parcelable 572 && throwable.getClass().getClassLoader() == Parcelable.class.getClassLoader(); 573 parcel.writeBoolean(isFrameworkParcelable); 574 if (isFrameworkParcelable) { 575 parcel.writeParcelable((Parcelable) throwable, 576 Parcelable.PARCELABLE_WRITE_RETURN_VALUE); 577 return; 578 } 579 580 parcel.writeString(throwable.getClass().getName()); 581 parcel.writeString(throwable.getMessage()); 582 StackTraceElement[] stackTrace = throwable.getStackTrace(); 583 StringBuilder stackTraceBuilder = new StringBuilder(); 584 int truncatedStackTraceLength = Math.min(stackTrace != null ? stackTrace.length : 0, 5); 585 for (int i = 0; i < truncatedStackTraceLength; i++) { 586 if (i > 0) { 587 stackTraceBuilder.append('\n'); 588 } 589 stackTraceBuilder.append("\tat ").append(stackTrace[i]); 590 } 591 parcel.writeString(stackTraceBuilder.toString()); 592 writeThrowable(parcel, throwable.getCause()); 593 } 594 595 /** 596 * @see #writeThrowable 597 */ readThrowable(@onNull Parcel parcel)598 private static @Nullable Throwable readThrowable(@NonNull Parcel parcel) { 599 final boolean hasThrowable = parcel.readBoolean(); 600 if (!hasThrowable) { 601 return null; 602 } 603 604 boolean isFrameworkParcelable = parcel.readBoolean(); 605 if (isFrameworkParcelable) { 606 return parcel.readParcelable(Parcelable.class.getClassLoader()); 607 } 608 609 String className = parcel.readString(); 610 String message = parcel.readString(); 611 String stackTrace = parcel.readString(); 612 String messageWithStackTrace = message + '\n' + stackTrace; 613 Throwable throwable; 614 try { 615 Class<?> clazz = Class.forName(className, true, Parcelable.class.getClassLoader()); 616 if (Throwable.class.isAssignableFrom(clazz)) { 617 Constructor<?> constructor = clazz.getConstructor(String.class); 618 throwable = (Throwable) constructor.newInstance(messageWithStackTrace); 619 } else { 620 android.util.EventLog.writeEvent(0x534e4554, "186530450", -1, ""); 621 throwable = new RuntimeException(className + ": " + messageWithStackTrace); 622 } 623 } catch (Throwable t) { 624 throwable = new RuntimeException(className + ": " + messageWithStackTrace); 625 throwable.addSuppressed(t); 626 } 627 throwable.setStackTrace(EMPTY_STACK_TRACE); 628 Throwable cause = readThrowable(parcel); 629 if (cause != null) { 630 throwable.initCause(cause); 631 } 632 return throwable; 633 } 634 635 @Override 636 @ExcludeFromCodeCoverageGeneratedReport(reason = BOILERPLATE_CODE) 637 @AddedInOrBefore(majorVersion = 33) describeContents()638 public int describeContents() { 639 return 0; 640 } 641 642 @AddedInOrBefore(majorVersion = 33) 643 public static final @NonNull Parcelable.Creator<AndroidFuture> CREATOR = 644 new Parcelable.Creator<AndroidFuture>() { 645 public AndroidFuture createFromParcel(Parcel parcel) { 646 return new AndroidFuture(parcel); 647 } 648 649 public AndroidFuture[] newArray(int size) { 650 return new AndroidFuture[size]; 651 } 652 }; 653 } 654