1 /* 2 * Copyright 2022 Google LLC 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.google.android.libraries.mobiledatadownload.internal.util; 17 18 import static com.google.common.util.concurrent.Futures.immediateFuture; 19 20 import com.google.android.libraries.mobiledatadownload.internal.annotations.SequentialControlExecutor; 21 import com.google.android.libraries.mobiledatadownload.tracing.PropagatedFutures; 22 import com.google.common.base.Function; 23 import com.google.common.util.concurrent.ListenableFuture; 24 import com.google.errorprone.annotations.CanIgnoreReturnValue; 25 import java.util.ArrayList; 26 import java.util.List; 27 import java.util.concurrent.Executor; 28 29 /** Utilities for manipulating futures. */ 30 public final class FuturesUtil { 31 32 private final Executor sequentialExecutor; 33 FuturesUtil(@equentialControlExecutor Executor sequentialExecutor)34 public FuturesUtil(@SequentialControlExecutor Executor sequentialExecutor) { 35 this.sequentialExecutor = sequentialExecutor; 36 } 37 38 /** 39 * Returns a SequentialFutureChain which can takes a number of asynchronous operation and turns 40 * them into a single asynchronous operation. 41 * 42 * <p>SequentialFutureChain provides a clearer way of writing a common idiom used to sequence a 43 * number of asynchrounous operations. The fragment 44 * 45 * <pre>{@code 46 * ListenableFuture<T> future = immediateFuture(init); 47 * future = transformAsync(future, arg -> asyncOp1, sequentialExecutor); 48 * future = transform(future, arg -> op2, sequentialExecutor); 49 * future = transformAsync(future, arg -> asyncOp3, sequentialExecutor); 50 * return future; 51 * }</pre> 52 * 53 * <p>can be rewritten as 54 * 55 * <pre>{@code 56 * return new FuturesUtil(sequentialExecutor) 57 * .newSequentialChain(init) 58 * .chainAsync(arg -> asyncOp1) 59 * .chain(arg -> op2) 60 * .chainAsync(arg -> asyncOp3) 61 * .start(); 62 * }</pre> 63 * 64 * <p>If any intermediate operation raises an exception, the whole chain raises an exception. 65 * 66 * <p>Note that sequentialExecutor must be a sequential executor, i.e. provide the sequentiality 67 * guarantees provided by {@link com.google.common.util.concurrent.SequentialExecutor}. 68 */ newSequentialChain(T init)69 public <T> SequentialFutureChain<T> newSequentialChain(T init) { 70 return new SequentialFutureChain<>(init); 71 } 72 73 /** 74 * Create a SequentialFutureChain that doesn't compute a result. 75 * 76 * <p>If any intermediate operation raises an exception, the whole chain raises an exception. 77 * 78 * <p>Note that sequentialExecutor must be a sequential executor, i.e. provide the sequentiality 79 * guarantees provided by {@link com.google.common.util.concurrent.SequentialExecutor}. 80 */ newSequentialChain()81 public SequentialFutureChain<Void> newSequentialChain() { 82 return new SequentialFutureChain<>(null); 83 } 84 85 /** Builds a list of Futurse to be executed sequentially. */ 86 public final class SequentialFutureChain<T> { 87 private final List<FutureChainElement<T>> operations; 88 private final T init; 89 SequentialFutureChain(T init)90 private SequentialFutureChain(T init) { 91 this.operations = new ArrayList<>(); 92 this.init = init; 93 } 94 95 @CanIgnoreReturnValue chain(Function<T, T> operation)96 public SequentialFutureChain<T> chain(Function<T, T> operation) { 97 operations.add(new DirectFutureChainElement<>(operation)); 98 return this; 99 } 100 101 @CanIgnoreReturnValue chainAsync(Function<T, ListenableFuture<T>> operation)102 public SequentialFutureChain<T> chainAsync(Function<T, ListenableFuture<T>> operation) { 103 operations.add(new AsyncFutureChainElement<>(operation)); 104 return this; 105 } 106 start()107 public ListenableFuture<T> start() { 108 ListenableFuture<T> result = immediateFuture(init); 109 for (FutureChainElement<T> operation : operations) { 110 result = operation.apply(result); 111 } 112 return result; 113 } 114 } 115 116 private interface FutureChainElement<T> { apply(ListenableFuture<T> input)117 abstract ListenableFuture<T> apply(ListenableFuture<T> input); 118 } 119 120 private final class DirectFutureChainElement<T> implements FutureChainElement<T> { 121 private final Function<T, T> operation; 122 DirectFutureChainElement(Function<T, T> operation)123 private DirectFutureChainElement(Function<T, T> operation) { 124 this.operation = operation; 125 } 126 127 @Override apply(ListenableFuture<T> input)128 public ListenableFuture<T> apply(ListenableFuture<T> input) { 129 return PropagatedFutures.transform(input, operation, sequentialExecutor); 130 } 131 } 132 133 private final class AsyncFutureChainElement<T> implements FutureChainElement<T> { 134 private final Function<T, ListenableFuture<T>> operation; 135 AsyncFutureChainElement(Function<T, ListenableFuture<T>> operation)136 private AsyncFutureChainElement(Function<T, ListenableFuture<T>> operation) { 137 this.operation = operation; 138 } 139 140 @Override apply(ListenableFuture<T> input)141 public ListenableFuture<T> apply(ListenableFuture<T> input) { 142 return PropagatedFutures.transformAsync(input, operation::apply, sequentialExecutor); 143 } 144 } 145 } 146