/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ package software.amazon.awssdk.utils; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.function.Function; import software.amazon.awssdk.annotations.SdkProtectedApi; /** * Utility class for working with {@link CompletableFuture}. */ @SdkProtectedApi public final class CompletableFutureUtils { private static final Logger log = Logger.loggerFor(CompletableFutureUtils.class); private CompletableFutureUtils() { } /** * Convenience method for creating a future that is immediately completed * exceptionally with the given {@code Throwable}. *

* Similar to {@code CompletableFuture#failedFuture} which was added in * Java 9. * * @param t The failure. * @param The type of the element. * @return The failed future. */ public static CompletableFuture failedFuture(Throwable t) { CompletableFuture cf = new CompletableFuture<>(); cf.completeExceptionally(t); return cf; } /** * Wraps the given error in a {@link CompletionException} if necessary. * Useful if an exception needs to be rethrown from within {@link * CompletableFuture#handle(java.util.function.BiFunction)} or similar * methods. * * @param t The error. * @return The error as a CompletionException. */ public static CompletionException errorAsCompletionException(Throwable t) { if (t instanceof CompletionException) { return (CompletionException) t; } return new CompletionException(t); } /** * Forward the {@code Throwable} from {@code src} to {@code dst}. * @param src The source of the {@code Throwable}. * @param dst The destination where the {@code Throwable} will be forwarded to. * * @return {@code src}. */ public static CompletableFuture forwardExceptionTo(CompletableFuture src, CompletableFuture dst) { src.whenComplete((r, e) -> { if (e != null) { dst.completeExceptionally(e); } }); return src; } /** * Forward the {@code Throwable} that can be transformed as per the transformationFunction * from {@code src} to {@code dst}. * @param src The source of the {@code Throwable}. * @param dst The destination where the {@code Throwable} will be forwarded to * @param transformationFunction Transformation function taht will be applied on to the forwarded exception. * @return */ public static CompletableFuture forwardTransformedExceptionTo(CompletableFuture src, CompletableFuture dst, Function transformationFunction) { src.whenComplete((r, e) -> { if (e != null) { dst.completeExceptionally(transformationFunction.apply(e)); } }); return src; } /** * Completes the {@code dst} future based on the result of the {@code src} future asynchronously on * the provided {@link Executor} and return the {@code src} future. * * @param src The source {@link CompletableFuture} * @param dst The destination where the {@code Throwable} or response will be forwarded to. * @return the {@code src} future. */ public static CompletableFuture forwardResultTo(CompletableFuture src, CompletableFuture dst) { src.whenComplete((r, e) -> { if (e != null) { dst.completeExceptionally(e); } else { dst.complete(r); } }); return src; } /** * Completes the {@code dst} future based on the result of the {@code src} future asynchronously on * the provided {@link Executor} and return the {@code src} future. * * @param src The source {@link CompletableFuture} * @param dst The destination where the {@code Throwable} or response will be forwarded to. * @param executor the executor to complete the des future * @return the {@code src} future. */ public static CompletableFuture forwardResultTo(CompletableFuture src, CompletableFuture dst, Executor executor) { src.whenCompleteAsync((r, e) -> { if (e != null) { dst.completeExceptionally(e); } else { dst.complete(r); } }, executor); return src; } /** * Completes the {@code dst} future based on the result of the {@code src} future, synchronously, * after applying the provided transformation {@link Function} if successful. * * @param src The source {@link CompletableFuture} * @param dst The destination where the {@code Throwable} or transformed result will be forwarded to. * @return the {@code src} future. */ public static CompletableFuture forwardTransformedResultTo(CompletableFuture src, CompletableFuture dst, Function function) { src.whenComplete((r, e) -> { if (e != null) { dst.completeExceptionally(e); } else { dst.complete(function.apply(r)); } }); return src; } /** * Similar to {@link CompletableFuture#allOf(CompletableFuture[])}, but * when any future is completed exceptionally, forwards the * exception to other futures. * * @param futures The futures. * @return The new future that is completed when all the futures in {@code * futures} are. */ public static CompletableFuture allOfExceptionForwarded(CompletableFuture[] futures) { CompletableFuture anyFail = anyFail(futures); anyFail.whenComplete((r, t) -> { if (t != null) { for (CompletableFuture cf : futures) { cf.completeExceptionally(t); } } }); return CompletableFuture.allOf(futures); } /** * Returns a new CompletableFuture that is completed when any of * the given CompletableFutures completes exceptionally. * * @param futures the CompletableFutures * @return a new CompletableFuture that is completed if any provided * future completed exceptionally. */ static CompletableFuture anyFail(CompletableFuture[] futures) { CompletableFuture completableFuture = new CompletableFuture<>(); for (CompletableFuture future : futures) { future.whenComplete((r, t) -> { if (t != null) { completableFuture.completeExceptionally(t); } }); } return completableFuture; } public static T joinInterruptibly(CompletableFuture future) { try { return future.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new CompletionException("Interrupted while waiting on a future.", e); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof Error) { throw (Error) cause; } throw new CompletionException(cause); } } public static void joinInterruptiblyIgnoringFailures(CompletableFuture future) { try { future.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { // Ignore } } /** * Joins (interruptibly) on the future, and re-throws any RuntimeExceptions or Errors just like the async task would have * thrown if it was executed synchronously. */ public static T joinLikeSync(CompletableFuture future) { try { return joinInterruptibly(future); } catch (CompletionException e) { Throwable cause = e.getCause(); if (cause instanceof RuntimeException) { // Make sure we don't lose the context of where the join is in the stack... cause.addSuppressed(new RuntimeException("Task failed.")); throw (RuntimeException) cause; } throw e; } } }