/* * Copyright 2022 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.google.android.libraries.mobiledatadownload.internal.util; import static com.google.common.util.concurrent.Futures.immediateFuture; import com.google.android.libraries.mobiledatadownload.internal.annotations.SequentialControlExecutor; import com.google.android.libraries.mobiledatadownload.tracing.PropagatedFutures; import com.google.common.base.Function; import com.google.common.util.concurrent.ListenableFuture; import com.google.errorprone.annotations.CanIgnoreReturnValue; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; /** Utilities for manipulating futures. */ public final class FuturesUtil { private final Executor sequentialExecutor; public FuturesUtil(@SequentialControlExecutor Executor sequentialExecutor) { this.sequentialExecutor = sequentialExecutor; } /** * Returns a SequentialFutureChain which can takes a number of asynchronous operation and turns * them into a single asynchronous operation. * *

SequentialFutureChain provides a clearer way of writing a common idiom used to sequence a * number of asynchrounous operations. The fragment * *

{@code
   * ListenableFuture future = immediateFuture(init);
   * future = transformAsync(future, arg -> asyncOp1, sequentialExecutor);
   * future = transform(future, arg -> op2, sequentialExecutor);
   * future = transformAsync(future, arg -> asyncOp3, sequentialExecutor);
   * return future;
   * }
* *

can be rewritten as * *

{@code
   * return new FuturesUtil(sequentialExecutor)
   *     .newSequentialChain(init)
   *     .chainAsync(arg -> asyncOp1)
   *     .chain(arg -> op2)
   *     .chainAsync(arg -> asyncOp3)
   *     .start();
   * }
* *

If any intermediate operation raises an exception, the whole chain raises an exception. * *

Note that sequentialExecutor must be a sequential executor, i.e. provide the sequentiality * guarantees provided by {@link com.google.common.util.concurrent.SequentialExecutor}. */ public SequentialFutureChain newSequentialChain(T init) { return new SequentialFutureChain<>(init); } /** * Create a SequentialFutureChain that doesn't compute a result. * *

If any intermediate operation raises an exception, the whole chain raises an exception. * *

Note that sequentialExecutor must be a sequential executor, i.e. provide the sequentiality * guarantees provided by {@link com.google.common.util.concurrent.SequentialExecutor}. */ public SequentialFutureChain newSequentialChain() { return new SequentialFutureChain<>(null); } /** Builds a list of Futurse to be executed sequentially. */ public final class SequentialFutureChain { private final List> operations; private final T init; private SequentialFutureChain(T init) { this.operations = new ArrayList<>(); this.init = init; } @CanIgnoreReturnValue public SequentialFutureChain chain(Function operation) { operations.add(new DirectFutureChainElement<>(operation)); return this; } @CanIgnoreReturnValue public SequentialFutureChain chainAsync(Function> operation) { operations.add(new AsyncFutureChainElement<>(operation)); return this; } public ListenableFuture start() { ListenableFuture result = immediateFuture(init); for (FutureChainElement operation : operations) { result = operation.apply(result); } return result; } } private interface FutureChainElement { abstract ListenableFuture apply(ListenableFuture input); } private final class DirectFutureChainElement implements FutureChainElement { private final Function operation; private DirectFutureChainElement(Function operation) { this.operation = operation; } @Override public ListenableFuture apply(ListenableFuture input) { return PropagatedFutures.transform(input, operation, sequentialExecutor); } } private final class AsyncFutureChainElement implements FutureChainElement { private final Function> operation; private AsyncFutureChainElement(Function> operation) { this.operation = operation; } @Override public ListenableFuture apply(ListenableFuture input) { return PropagatedFutures.transformAsync(input, operation::apply, sequentialExecutor); } } }