• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2015 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  */
14 
15 package com.google.common.util.concurrent;
16 
17 import static com.google.common.base.Preconditions.checkNotNull;
18 import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE;
19 
20 import com.google.common.annotations.GwtCompatible;
21 import com.google.common.collect.ImmutableCollection;
22 import com.google.j2objc.annotations.WeakOuter;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.CancellationException;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.RejectedExecutionException;
28 import javax.annotation.CheckForNull;
29 import org.checkerframework.checker.nullness.qual.Nullable;
30 
31 /** Aggregate future that computes its value by calling a callable. */
32 @GwtCompatible
33 @ElementTypesAreNonnullByDefault
34 final class CombinedFuture<V extends @Nullable Object>
35     extends AggregateFuture<@Nullable Object, V> {
36   @CheckForNull private CombinedFutureInterruptibleTask<?> task;
37 
CombinedFuture( ImmutableCollection<? extends ListenableFuture<?>> futures, boolean allMustSucceed, Executor listenerExecutor, AsyncCallable<V> callable)38   CombinedFuture(
39       ImmutableCollection<? extends ListenableFuture<?>> futures,
40       boolean allMustSucceed,
41       Executor listenerExecutor,
42       AsyncCallable<V> callable) {
43     super(futures, allMustSucceed, false);
44     this.task = new AsyncCallableInterruptibleTask(callable, listenerExecutor);
45     init();
46   }
47 
CombinedFuture( ImmutableCollection<? extends ListenableFuture<?>> futures, boolean allMustSucceed, Executor listenerExecutor, Callable<V> callable)48   CombinedFuture(
49       ImmutableCollection<? extends ListenableFuture<?>> futures,
50       boolean allMustSucceed,
51       Executor listenerExecutor,
52       Callable<V> callable) {
53     super(futures, allMustSucceed, false);
54     this.task = new CallableInterruptibleTask(callable, listenerExecutor);
55     init();
56   }
57 
58   @Override
collectOneValue(int index, @CheckForNull Object returnValue)59   void collectOneValue(int index, @CheckForNull Object returnValue) {}
60 
61   @Override
handleAllCompleted()62   void handleAllCompleted() {
63     CombinedFutureInterruptibleTask<?> localTask = task;
64     if (localTask != null) {
65       localTask.execute();
66     }
67   }
68 
69   @Override
releaseResources(ReleaseResourcesReason reason)70   void releaseResources(ReleaseResourcesReason reason) {
71     super.releaseResources(reason);
72     /*
73      * If the output future is done, then it won't need to interrupt the task later, so it can clear
74      * its reference to it.
75      *
76      * If the output future is *not* done, then the task field will be cleared after the task runs
77      * or after the output future is done, whichever comes first.
78      */
79     if (reason == OUTPUT_FUTURE_DONE) {
80       this.task = null;
81     }
82   }
83 
84   @Override
interruptTask()85   protected void interruptTask() {
86     CombinedFutureInterruptibleTask<?> localTask = task;
87     if (localTask != null) {
88       localTask.interruptTask();
89     }
90   }
91 
92   @WeakOuter
93   private abstract class CombinedFutureInterruptibleTask<T extends @Nullable Object>
94       extends InterruptibleTask<T> {
95     private final Executor listenerExecutor;
96 
CombinedFutureInterruptibleTask(Executor listenerExecutor)97     CombinedFutureInterruptibleTask(Executor listenerExecutor) {
98       this.listenerExecutor = checkNotNull(listenerExecutor);
99     }
100 
101     @Override
isDone()102     final boolean isDone() {
103       return CombinedFuture.this.isDone();
104     }
105 
execute()106     final void execute() {
107       try {
108         listenerExecutor.execute(this);
109       } catch (RejectedExecutionException e) {
110         CombinedFuture.this.setException(e);
111       }
112     }
113 
114     @Override
afterRanInterruptiblySuccess(@arametricNullness T result)115     final void afterRanInterruptiblySuccess(@ParametricNullness T result) {
116       /*
117        * The future no longer needs to interrupt this task, so it no longer needs a reference to it.
118        *
119        * TODO(cpovirk): It might be nice for our InterruptibleTask subclasses to null out their
120        *  `callable` fields automatically. That would make it less important for us to null out the
121        * reference to `task` here (though it's still nice to do so in case our reference to the
122        * executor keeps it alive). Ideally, nulling out `callable` would be the responsibility of
123        * InterruptibleTask itself so that its other subclasses also benefit. (Handling `callable` in
124        * InterruptibleTask itself might also eliminate some of the existing boilerplate for, e.g.,
125        * pendingToString().)
126        */
127       CombinedFuture.this.task = null;
128 
129       setValue(result);
130     }
131 
132     @Override
afterRanInterruptiblyFailure(Throwable error)133     final void afterRanInterruptiblyFailure(Throwable error) {
134       // See afterRanInterruptiblySuccess.
135       CombinedFuture.this.task = null;
136 
137       if (error instanceof ExecutionException) {
138         /*
139          * Cast to ExecutionException to satisfy our nullness checker, which (unsoundly but
140          * *usually* safely) assumes that getCause() returns non-null on an ExecutionException.
141          */
142         CombinedFuture.this.setException(((ExecutionException) error).getCause());
143       } else if (error instanceof CancellationException) {
144         cancel(false);
145       } else {
146         CombinedFuture.this.setException(error);
147       }
148     }
149 
setValue(@arametricNullness T value)150     abstract void setValue(@ParametricNullness T value);
151   }
152 
153   @WeakOuter
154   private final class AsyncCallableInterruptibleTask
155       extends CombinedFutureInterruptibleTask<ListenableFuture<V>> {
156     private final AsyncCallable<V> callable;
157 
AsyncCallableInterruptibleTask(AsyncCallable<V> callable, Executor listenerExecutor)158     AsyncCallableInterruptibleTask(AsyncCallable<V> callable, Executor listenerExecutor) {
159       super(listenerExecutor);
160       this.callable = checkNotNull(callable);
161     }
162 
163     @Override
runInterruptibly()164     ListenableFuture<V> runInterruptibly() throws Exception {
165       ListenableFuture<V> result = callable.call();
166       return checkNotNull(
167           result,
168           "AsyncCallable.call returned null instead of a Future. "
169               + "Did you mean to return immediateFuture(null)? %s",
170           callable);
171     }
172 
173     @Override
setValue(ListenableFuture<V> value)174     void setValue(ListenableFuture<V> value) {
175       CombinedFuture.this.setFuture(value);
176     }
177 
178     @Override
toPendingString()179     String toPendingString() {
180       return callable.toString();
181     }
182   }
183 
184   @WeakOuter
185   private final class CallableInterruptibleTask extends CombinedFutureInterruptibleTask<V> {
186     private final Callable<V> callable;
187 
CallableInterruptibleTask(Callable<V> callable, Executor listenerExecutor)188     CallableInterruptibleTask(Callable<V> callable, Executor listenerExecutor) {
189       super(listenerExecutor);
190       this.callable = checkNotNull(callable);
191     }
192 
193     @Override
194     @ParametricNullness
runInterruptibly()195     V runInterruptibly() throws Exception {
196       return callable.call();
197     }
198 
199     @Override
setValue(@arametricNullness V value)200     void setValue(@ParametricNullness V value) {
201       CombinedFuture.this.set(value);
202     }
203 
204     @Override
toPendingString()205     String toPendingString() {
206       return callable.toString();
207     }
208   }
209 }
210