• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2022 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.google.android.libraries.mobiledatadownload.internal.util;
17 
18 import static com.google.common.util.concurrent.Futures.immediateFuture;
19 
20 import com.google.android.libraries.mobiledatadownload.internal.annotations.SequentialControlExecutor;
21 import com.google.android.libraries.mobiledatadownload.tracing.PropagatedFutures;
22 import com.google.common.base.Function;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.errorprone.annotations.CanIgnoreReturnValue;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.Executor;
28 
29 /** Utilities for manipulating futures. */
30 public final class FuturesUtil {
31 
32   private final Executor sequentialExecutor;
33 
FuturesUtil(@equentialControlExecutor Executor sequentialExecutor)34   public FuturesUtil(@SequentialControlExecutor Executor sequentialExecutor) {
35     this.sequentialExecutor = sequentialExecutor;
36   }
37 
38   /**
39    * Returns a SequentialFutureChain which can takes a number of asynchronous operation and turns
40    * them into a single asynchronous operation.
41    *
42    * <p>SequentialFutureChain provides a clearer way of writing a common idiom used to sequence a
43    * number of asynchrounous operations. The fragment
44    *
45    * <pre>{@code
46    * ListenableFuture<T> future = immediateFuture(init);
47    * future = transformAsync(future, arg -> asyncOp1, sequentialExecutor);
48    * future = transform(future, arg -> op2, sequentialExecutor);
49    * future = transformAsync(future, arg -> asyncOp3, sequentialExecutor);
50    * return future;
51    * }</pre>
52    *
53    * <p>can be rewritten as
54    *
55    * <pre>{@code
56    * return new FuturesUtil(sequentialExecutor)
57    *     .newSequentialChain(init)
58    *     .chainAsync(arg -> asyncOp1)
59    *     .chain(arg -> op2)
60    *     .chainAsync(arg -> asyncOp3)
61    *     .start();
62    * }</pre>
63    *
64    * <p>If any intermediate operation raises an exception, the whole chain raises an exception.
65    *
66    * <p>Note that sequentialExecutor must be a sequential executor, i.e. provide the sequentiality
67    * guarantees provided by {@link com.google.common.util.concurrent.SequentialExecutor}.
68    */
newSequentialChain(T init)69   public <T> SequentialFutureChain<T> newSequentialChain(T init) {
70     return new SequentialFutureChain<>(init);
71   }
72 
73   /**
74    * Create a SequentialFutureChain that doesn't compute a result.
75    *
76    * <p>If any intermediate operation raises an exception, the whole chain raises an exception.
77    *
78    * <p>Note that sequentialExecutor must be a sequential executor, i.e. provide the sequentiality
79    * guarantees provided by {@link com.google.common.util.concurrent.SequentialExecutor}.
80    */
newSequentialChain()81   public SequentialFutureChain<Void> newSequentialChain() {
82     return new SequentialFutureChain<>(null);
83   }
84 
85   /** Builds a list of Futurse to be executed sequentially. */
86   public final class SequentialFutureChain<T> {
87     private final List<FutureChainElement<T>> operations;
88     private final T init;
89 
SequentialFutureChain(T init)90     private SequentialFutureChain(T init) {
91       this.operations = new ArrayList<>();
92       this.init = init;
93     }
94 
95     @CanIgnoreReturnValue
chain(Function<T, T> operation)96     public SequentialFutureChain<T> chain(Function<T, T> operation) {
97       operations.add(new DirectFutureChainElement<>(operation));
98       return this;
99     }
100 
101     @CanIgnoreReturnValue
chainAsync(Function<T, ListenableFuture<T>> operation)102     public SequentialFutureChain<T> chainAsync(Function<T, ListenableFuture<T>> operation) {
103       operations.add(new AsyncFutureChainElement<>(operation));
104       return this;
105     }
106 
start()107     public ListenableFuture<T> start() {
108       ListenableFuture<T> result = immediateFuture(init);
109       for (FutureChainElement<T> operation : operations) {
110         result = operation.apply(result);
111       }
112       return result;
113     }
114   }
115 
116   private interface FutureChainElement<T> {
apply(ListenableFuture<T> input)117     abstract ListenableFuture<T> apply(ListenableFuture<T> input);
118   }
119 
120   private final class DirectFutureChainElement<T> implements FutureChainElement<T> {
121     private final Function<T, T> operation;
122 
DirectFutureChainElement(Function<T, T> operation)123     private DirectFutureChainElement(Function<T, T> operation) {
124       this.operation = operation;
125     }
126 
127     @Override
apply(ListenableFuture<T> input)128     public ListenableFuture<T> apply(ListenableFuture<T> input) {
129       return PropagatedFutures.transform(input, operation, sequentialExecutor);
130     }
131   }
132 
133   private final class AsyncFutureChainElement<T> implements FutureChainElement<T> {
134     private final Function<T, ListenableFuture<T>> operation;
135 
AsyncFutureChainElement(Function<T, ListenableFuture<T>> operation)136     private AsyncFutureChainElement(Function<T, ListenableFuture<T>> operation) {
137       this.operation = operation;
138     }
139 
140     @Override
apply(ListenableFuture<T> input)141     public ListenableFuture<T> apply(ListenableFuture<T> input) {
142       return PropagatedFutures.transformAsync(input, operation::apply, sequentialExecutor);
143     }
144   }
145 }
146