• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.base.Preconditions.checkState;
20 import static com.google.common.collect.Lists.asList;
21 import static com.google.common.truth.Truth.assertThat;
22 import static com.google.common.truth.Truth.assertWithMessage;
23 import static com.google.common.util.concurrent.Futures.immediateCancelledFuture;
24 import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
25 import static com.google.common.util.concurrent.Futures.immediateFuture;
26 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
27 import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination;
28 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
29 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
30 import static java.util.Arrays.asList;
31 import static java.util.concurrent.Executors.newSingleThreadExecutor;
32 import static java.util.concurrent.TimeUnit.SECONDS;
33 import static org.junit.Assert.assertThrows;
34 import static org.mockito.Mockito.doThrow;
35 import static org.mockito.Mockito.timeout;
36 import static org.mockito.Mockito.verify;
37 
38 import com.google.common.collect.ImmutableList;
39 import com.google.common.reflect.Reflection;
40 import com.google.common.truth.FailureStrategy;
41 import com.google.common.truth.StandardSubjectBuilder;
42 import com.google.common.util.concurrent.ClosingFuture.AsyncClosingCallable;
43 import com.google.common.util.concurrent.ClosingFuture.AsyncClosingFunction;
44 import com.google.common.util.concurrent.ClosingFuture.ClosingCallable;
45 import com.google.common.util.concurrent.ClosingFuture.ClosingFunction;
46 import com.google.common.util.concurrent.ClosingFuture.Combiner;
47 import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable;
48 import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable;
49 import com.google.common.util.concurrent.ClosingFuture.Combiner2.AsyncClosingFunction2;
50 import com.google.common.util.concurrent.ClosingFuture.Combiner2.ClosingFunction2;
51 import com.google.common.util.concurrent.ClosingFuture.Combiner3.ClosingFunction3;
52 import com.google.common.util.concurrent.ClosingFuture.Combiner4.ClosingFunction4;
53 import com.google.common.util.concurrent.ClosingFuture.Combiner5.ClosingFunction5;
54 import com.google.common.util.concurrent.ClosingFuture.DeferredCloser;
55 import com.google.common.util.concurrent.ClosingFuture.Peeker;
56 import com.google.common.util.concurrent.ClosingFuture.ValueAndCloser;
57 import com.google.common.util.concurrent.ClosingFuture.ValueAndCloserConsumer;
58 import java.io.Closeable;
59 import java.io.IOException;
60 import java.io.PrintWriter;
61 import java.io.StringWriter;
62 import java.lang.reflect.InvocationHandler;
63 import java.lang.reflect.InvocationTargetException;
64 import java.lang.reflect.Method;
65 import java.util.ArrayList;
66 import java.util.List;
67 import java.util.concurrent.Callable;
68 import java.util.concurrent.CancellationException;
69 import java.util.concurrent.CountDownLatch;
70 import java.util.concurrent.ExecutionException;
71 import java.util.concurrent.Executor;
72 import java.util.concurrent.ExecutorService;
73 import java.util.concurrent.Future;
74 import java.util.concurrent.RejectedExecutionException;
75 import java.util.concurrent.atomic.AtomicReference;
76 import junit.framework.TestCase;
77 import org.mockito.Mockito;
78 
79 /**
80  * Tests for {@link ClosingFuture}. Subclasses exercise either the {@link
81  * ClosingFuture#finishToFuture()} or {@link
82  * ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} paths to complete a
83  * {@link ClosingFuture} pipeline.
84  */
85 public abstract class AbstractClosingFutureTest extends TestCase {
86   // TODO(dpb): Use Expect once that supports JUnit 3, or we can use JUnit 4.
87   final List<AssertionError> failures = new ArrayList<>();
88   final StandardSubjectBuilder expect =
89       StandardSubjectBuilder.forCustomFailureStrategy(
90           new FailureStrategy() {
91             @Override
92             public void fail(AssertionError failure) {
93               failures.add(failure);
94             }
95           });
96 
97   final ListeningExecutorService executor =
98       MoreExecutors.listeningDecorator(newSingleThreadExecutor());
99   final ExecutorService closingExecutor = newSingleThreadExecutor();
100 
101   final TestCloseable closeable1 = new TestCloseable("closeable1");
102   final TestCloseable closeable2 = new TestCloseable("closeable2");
103   final TestCloseable closeable3 = new TestCloseable("closeable3");
104   final TestCloseable closeable4 = new TestCloseable("closeable4");
105 
106   final Waiter waiter = new Waiter();
107   final CountDownLatch futureCancelled = new CountDownLatch(1);
108   final Exception exception = new Exception();
109   final Closeable mockCloseable = Mockito.mock(Closeable.class);
110 
111   @Override
tearDown()112   protected void tearDown() throws Exception {
113     assertNoExpectedFailures();
114     super.tearDown();
115   }
116 
testFrom()117   public void testFrom() throws Exception {
118     ClosingFuture<String> closingFuture =
119         ClosingFuture.from(executor.submit(Callables.returning(closeable1)))
120             .transform(
121                 new ClosingFunction<TestCloseable, String>() {
122                   @Override
123                   public String apply(DeferredCloser closer, TestCloseable v) throws Exception {
124                     assertThat(v).isSameInstanceAs(closeable1);
125                     return "value";
126                   }
127                 },
128                 executor);
129     assertThat(getFinalValue(closingFuture)).isEqualTo("value");
130     waitUntilClosed(closingFuture);
131     assertStillOpen(closeable1);
132   }
133 
testFrom_failedInput()134   public void testFrom_failedInput() throws Exception {
135     assertFinallyFailsWithException(failedClosingFuture());
136   }
137 
testFrom_cancelledInput()138   public void testFrom_cancelledInput() throws Exception {
139     assertBecomesCanceled(ClosingFuture.from(immediateCancelledFuture()));
140   }
141 
testEventuallyClosing()142   public void testEventuallyClosing() throws Exception {
143     ClosingFuture<String> closingFuture =
144         ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor)
145             .transform(
146                 new ClosingFunction<TestCloseable, String>() {
147                   @Override
148                   public String apply(DeferredCloser closer, TestCloseable v) throws Exception {
149                     assertThat(v).isSameInstanceAs(closeable1);
150                     assertStillOpen(closeable1);
151                     return "value";
152                   }
153                 },
154                 executor);
155     assertThat(getFinalValue(closingFuture)).isEqualTo("value");
156     waitUntilClosed(closingFuture);
157     assertClosed(closeable1);
158   }
159 
testEventuallyClosing_failedInput()160   public void testEventuallyClosing_failedInput() throws Exception {
161     assertFinallyFailsWithException(
162         ClosingFuture.eventuallyClosing(
163             Futures.<Closeable>immediateFailedFuture(exception), closingExecutor));
164   }
165 
testEventuallyClosing_cancelledInput()166   public void testEventuallyClosing_cancelledInput() throws Exception {
167     assertBecomesCanceled(
168         ClosingFuture.eventuallyClosing(
169             Futures.<Closeable>immediateCancelledFuture(), closingExecutor));
170   }
171 
testEventuallyClosing_cancelledPipeline()172   public void testEventuallyClosing_cancelledPipeline() throws Exception {
173     ClosingFuture<TestCloseable> closingFuture =
174         ClosingFuture.eventuallyClosing(
175             executor.submit(
176                 waiter.waitFor(
177                     new Callable<TestCloseable>() {
178                       @Override
179                       public TestCloseable call() throws InterruptedException {
180                         awaitUninterruptibly(futureCancelled);
181                         return closeable1;
182                       }
183                     })),
184             closingExecutor);
185     waiter.awaitStarted();
186     cancelFinalStepAndWait(closingFuture);
187     // not closed until the callable returns
188     assertStillOpen(closeable1);
189     waiter.awaitReturned();
190     assertClosed(closeable1);
191   }
192 
testEventuallyClosing_throws()193   public void testEventuallyClosing_throws() throws Exception {
194     assertFinallyFailsWithException(
195         ClosingFuture.eventuallyClosing(
196             executor.submit(
197                 new Callable<TestCloseable>() {
198                   @Override
199                   public TestCloseable call() throws Exception {
200                     throw exception;
201                   }
202                 }),
203             closingExecutor));
204   }
205 
testSubmit()206   public void testSubmit() throws Exception {
207     ClosingFuture<String> closingFuture =
208         ClosingFuture.submit(
209                 new ClosingCallable<TestCloseable>() {
210                   @Override
211                   public TestCloseable call(DeferredCloser closer) throws Exception {
212                     closer.eventuallyClose(closeable1, closingExecutor);
213                     closer.eventuallyClose(closeable2, closingExecutor);
214                     return closeable3;
215                   }
216                 },
217                 executor)
218             .transform(
219                 new ClosingFunction<TestCloseable, String>() {
220                   @Override
221                   public String apply(DeferredCloser closer, TestCloseable v) throws Exception {
222                     assertThat(v).isSameInstanceAs(closeable3);
223                     assertStillOpen(closeable1, closeable2, closeable3);
224                     return "value";
225                   }
226                 },
227                 executor);
228     assertThat(getFinalValue(closingFuture)).isEqualTo("value");
229     waitUntilClosed(closingFuture);
230     assertClosed(closeable1, closeable2);
231     assertStillOpen(closeable3);
232   }
233 
testSubmit_cancelledPipeline()234   public void testSubmit_cancelledPipeline() throws Exception {
235     ClosingFuture<TestCloseable> closingFuture =
236         ClosingFuture.submit(
237             waiter.waitFor(
238                 new ClosingCallable<TestCloseable>() {
239                   @Override
240                   public TestCloseable call(DeferredCloser closer) throws Exception {
241                     awaitUninterruptibly(futureCancelled);
242                     closer.eventuallyClose(closeable1, closingExecutor);
243                     closer.eventuallyClose(closeable2, closingExecutor);
244                     return closeable3;
245                   }
246                 }),
247             executor);
248     waiter.awaitStarted();
249     cancelFinalStepAndWait(closingFuture);
250     waiter.awaitReturned();
251     assertClosed(closeable1, closeable2);
252     assertStillOpen(closeable3);
253   }
254 
testSubmit_throws()255   public void testSubmit_throws() throws Exception {
256     ClosingFuture<Object> closingFuture =
257         ClosingFuture.submit(
258             new ClosingCallable<Object>() {
259               @Override
260               public Object call(DeferredCloser closer) throws Exception {
261                 closer.eventuallyClose(closeable1, closingExecutor);
262                 closer.eventuallyClose(closeable2, closingExecutor);
263                 throw exception;
264               }
265             },
266             executor);
267     assertFinallyFailsWithException(closingFuture);
268     waitUntilClosed(closingFuture);
269     assertClosed(closeable1, closeable2);
270   }
271 
testSubmitAsync()272   public void testSubmitAsync() throws Exception {
273     ClosingFuture<TestCloseable> closingFuture =
274         ClosingFuture.submitAsync(
275             new AsyncClosingCallable<TestCloseable>() {
276               @Override
277               public ClosingFuture<TestCloseable> call(DeferredCloser closer) {
278                 closer.eventuallyClose(closeable1, closingExecutor);
279                 return ClosingFuture.submit(
280                     new ClosingCallable<TestCloseable>() {
281                       @Override
282                       public TestCloseable call(DeferredCloser deferredCloser) throws Exception {
283                         return closeable2;
284                       }
285                     },
286                     directExecutor());
287               }
288             },
289             executor);
290     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
291     waitUntilClosed(closingFuture);
292     assertClosed(closeable1);
293     assertStillOpen(closeable2);
294   }
295 
testSubmitAsync_cancelledPipeline()296   public void testSubmitAsync_cancelledPipeline() throws Exception {
297     ClosingFuture<TestCloseable> closingFuture =
298         ClosingFuture.submitAsync(
299             waiter.waitFor(
300                 new AsyncClosingCallable<TestCloseable>() {
301                   @Override
302                   public ClosingFuture<TestCloseable> call(DeferredCloser closer) throws Exception {
303                     awaitUninterruptibly(futureCancelled);
304                     closer.eventuallyClose(closeable1, closingExecutor);
305                     closer.eventuallyClose(closeable2, closingExecutor);
306                     return ClosingFuture.submit(
307                         new ClosingCallable<TestCloseable>() {
308                           @Override
309                           public TestCloseable call(DeferredCloser deferredCloser)
310                               throws Exception {
311                             deferredCloser.eventuallyClose(closeable3, closingExecutor);
312                             return closeable3;
313                           }
314                         },
315                         directExecutor());
316                   }
317                 }),
318             executor);
319     waiter.awaitStarted();
320     cancelFinalStepAndWait(closingFuture);
321     waiter.awaitReturned();
322     assertClosed(closeable1, closeable2, closeable3);
323   }
324 
testSubmitAsync_throws()325   public void testSubmitAsync_throws() throws Exception {
326     ClosingFuture<Object> closingFuture =
327         ClosingFuture.submitAsync(
328             new AsyncClosingCallable<Object>() {
329               @Override
330               public ClosingFuture<Object> call(DeferredCloser closer) throws Exception {
331                 closer.eventuallyClose(closeable1, closingExecutor);
332                 closer.eventuallyClose(closeable2, closingExecutor);
333                 throw exception;
334               }
335             },
336             executor);
337     assertFinallyFailsWithException(closingFuture);
338     waitUntilClosed(closingFuture);
339     assertClosed(closeable1, closeable2);
340   }
341 
testAutoCloseable()342   public void testAutoCloseable() throws Exception {
343     AutoCloseable autoCloseable = closeable1::close;
344     ClosingFuture<String> closingFuture =
345         ClosingFuture.submit(
346             new ClosingCallable<String>() {
347               @Override
348               public String call(DeferredCloser closer) throws Exception {
349                 closer.eventuallyClose(autoCloseable, closingExecutor);
350                 return "";
351               }
352             },
353             executor);
354     assertThat(getFinalValue(closingFuture)).isEqualTo("");
355     waitUntilClosed(closingFuture);
356     assertClosed(closeable1);
357   }
358 
testStatusFuture()359   public void testStatusFuture() throws Exception {
360     ClosingFuture<String> closingFuture =
361         ClosingFuture.submit(
362             waiter.waitFor(
363                 new ClosingCallable<String>() {
364                   @Override
365                   public String call(DeferredCloser closer) throws Exception {
366                     return "value";
367                   }
368                 }),
369             executor);
370     ListenableFuture<?> statusFuture = closingFuture.statusFuture();
371     waiter.awaitStarted();
372     assertThat(statusFuture.isDone()).isFalse();
373     waiter.awaitReturned();
374     assertThat(getUninterruptibly(statusFuture)).isNull();
375   }
376 
testStatusFuture_failure()377   public void testStatusFuture_failure() throws Exception {
378     ClosingFuture<String> closingFuture =
379         ClosingFuture.submit(
380             waiter.waitFor(
381                 new ClosingCallable<String>() {
382                   @Override
383                   public String call(DeferredCloser closer) throws Exception {
384                     throw exception;
385                   }
386                 }),
387             executor);
388     ListenableFuture<?> statusFuture = closingFuture.statusFuture();
389     waiter.awaitStarted();
390     assertThat(statusFuture.isDone()).isFalse();
391     waiter.awaitReturned();
392     assertThatFutureFailsWithException(statusFuture);
393   }
394 
testStatusFuture_cancelDoesNothing()395   public void testStatusFuture_cancelDoesNothing() throws Exception {
396     ClosingFuture<String> closingFuture =
397         ClosingFuture.submit(
398             waiter.waitFor(
399                 new ClosingCallable<String>() {
400                   @Override
401                   public String call(DeferredCloser closer) throws Exception {
402                     return "value";
403                   }
404                 }),
405             executor);
406     ListenableFuture<?> statusFuture = closingFuture.statusFuture();
407     waiter.awaitStarted();
408     assertThat(statusFuture.isDone()).isFalse();
409     statusFuture.cancel(true);
410     assertThat(statusFuture.isCancelled()).isTrue();
411     waiter.awaitReturned();
412     assertThat(getFinalValue(closingFuture)).isEqualTo("value");
413   }
414 
testCancel_caught()415   public void testCancel_caught() throws Exception {
416     ClosingFuture<String> step0 = ClosingFuture.from(immediateFuture("value 0"));
417     ClosingFuture<String> step1 =
418         step0.transform(
419             new ClosingFunction<String, String>() {
420               @Override
421               public String apply(DeferredCloser closer, String v) throws Exception {
422                 closer.eventuallyClose(closeable1, closingExecutor);
423                 return "value 1";
424               }
425             },
426             executor);
427     Waiter step2Waiter = new Waiter();
428     ClosingFuture<String> step2 =
429         step1.transform(
430             step2Waiter.waitFor(
431                 new ClosingFunction<String, String>() {
432                   @Override
433                   public String apply(DeferredCloser closer, String v) throws Exception {
434                     closer.eventuallyClose(closeable2, closingExecutor);
435                     return "value 2";
436                   }
437                 }),
438             executor);
439     ClosingFuture<String> step3 =
440         step2.transform(
441             new ClosingFunction<String, String>() {
442               @Override
443               public String apply(DeferredCloser closer, String input) throws Exception {
444                 closer.eventuallyClose(closeable3, closingExecutor);
445                 return "value 3";
446               }
447             },
448             executor);
449     Waiter step4Waiter = new Waiter();
450     ClosingFuture<String> step4 =
451         step3.catching(
452             CancellationException.class,
453             step4Waiter.waitFor(
454                 new ClosingFunction<CancellationException, String>() {
455                   @Override
456                   public String apply(DeferredCloser closer, CancellationException input)
457                       throws Exception {
458                     closer.eventuallyClose(closeable4, closingExecutor);
459                     return "value 4";
460                   }
461                 }),
462             executor);
463 
464     // Pause in step 2.
465     step2Waiter.awaitStarted();
466 
467     // Everything should still be open.
468     assertStillOpen(closeable1, closeable2, closeable3, closeable4);
469 
470     // Cancel step 3, resume step 2, and pause in step 4.
471     assertWithMessage("step3.cancel()").that(step3.cancel(false)).isTrue();
472     step2Waiter.awaitReturned();
473     step4Waiter.awaitStarted();
474 
475     // Step 1 is not cancelled because it was done.
476     assertWithMessage("step1.statusFuture().isCancelled()")
477         .that(step1.statusFuture().isCancelled())
478         .isFalse();
479     // But its closeable is closed.
480     assertClosed(closeable1);
481 
482     // Step 2 is cancelled because it wasn't complete.
483     assertWithMessage("step2.statusFuture().isCancelled()")
484         .that(step2.statusFuture().isCancelled())
485         .isTrue();
486     // Its closeable is closed.
487     assertClosed(closeable2);
488 
489     // Step 3 was cancelled before it began
490     assertWithMessage("step3.statusFuture().isCancelled()")
491         .that(step3.statusFuture().isCancelled())
492         .isTrue();
493     // Its closeable is still open.
494     assertStillOpen(closeable3);
495 
496     // Step 4 is not cancelled, because it caught the cancellation.
497     assertWithMessage("step4.statusFuture().isCancelled()")
498         .that(step4.statusFuture().isCancelled())
499         .isFalse();
500     // Its closeable isn't closed yet.
501     assertStillOpen(closeable4);
502 
503     // Resume step 4 and complete.
504     step4Waiter.awaitReturned();
505     assertThat(getFinalValue(step4)).isEqualTo("value 4");
506 
507     // Step 4's closeable is now closed.
508     assertClosed(closeable4);
509     // Step 3 still never ran, so its closeable should still be open.
510     assertStillOpen(closeable3);
511   }
512 
testTransform()513   public void testTransform() throws Exception {
514     ClosingFuture<String> closingFuture =
515         ClosingFuture.from(immediateFuture("value"))
516             .transform(
517                 new ClosingFunction<String, TestCloseable>() {
518                   @Override
519                   public TestCloseable apply(DeferredCloser closer, String v) throws Exception {
520                     closer.eventuallyClose(closeable1, closingExecutor);
521                     closer.eventuallyClose(closeable2, closingExecutor);
522                     return closeable3;
523                   }
524                 },
525                 executor)
526             .transform(
527                 new ClosingFunction<TestCloseable, String>() {
528                   @Override
529                   public String apply(DeferredCloser closer, TestCloseable v) throws Exception {
530                     assertThat(v).isSameInstanceAs(closeable3);
531                     assertStillOpen(closeable1, closeable2, closeable3);
532                     return "value";
533                   }
534                 },
535                 executor);
536     assertThat(getFinalValue(closingFuture)).isEqualTo("value");
537     waitUntilClosed(closingFuture);
538     assertClosed(closeable1, closeable2);
539     assertStillOpen(closeable3);
540   }
541 
testTransform_cancelledPipeline()542   public void testTransform_cancelledPipeline() throws Exception {
543     String value = "value";
544     ClosingFuture<TestCloseable> closingFuture =
545         ClosingFuture.from(immediateFuture(value))
546             .transform(
547                 new ClosingFunction<String, TestCloseable>() {
548                   @Override
549                   public TestCloseable apply(DeferredCloser closer, String v) throws Exception {
550                     return closer.eventuallyClose(closeable1, closingExecutor);
551                   }
552                 },
553                 executor)
554             .transform(
555                 waiter.waitFor(
556                     new ClosingFunction<TestCloseable, TestCloseable>() {
557                       @Override
558                       public TestCloseable apply(DeferredCloser closer, TestCloseable v)
559                           throws Exception {
560                         awaitUninterruptibly(futureCancelled);
561                         closer.eventuallyClose(closeable2, closingExecutor);
562                         closer.eventuallyClose(closeable3, closingExecutor);
563                         return closeable4;
564                       }
565                     }),
566                 executor);
567     waiter.awaitStarted();
568     cancelFinalStepAndWait(closingFuture);
569     waiter.awaitReturned();
570     assertClosed(closeable1, closeable2, closeable3);
571     assertStillOpen(closeable4);
572   }
573 
testTransform_throws()574   public void testTransform_throws() throws Exception {
575     ClosingFuture<Object> closingFuture =
576         ClosingFuture.from(immediateFuture("value"))
577             .transform(
578                 new ClosingFunction<String, Object>() {
579                   @Override
580                   public Object apply(DeferredCloser closer, String v) throws Exception {
581                     closer.eventuallyClose(closeable1, closingExecutor);
582                     closer.eventuallyClose(closeable2, closingExecutor);
583                     throw exception;
584                   }
585                 },
586                 executor);
587     assertFinallyFailsWithException(closingFuture);
588     waitUntilClosed(closingFuture);
589     assertClosed(closeable1, closeable2);
590   }
591 
testTransformAsync()592   public void testTransformAsync() throws Exception {
593     ClosingFuture<String> closingFuture =
594         ClosingFuture.from(immediateFuture("value"))
595             .transformAsync(
596                 new AsyncClosingFunction<String, TestCloseable>() {
597                   @Override
598                   public ClosingFuture<TestCloseable> apply(DeferredCloser closer, String v)
599                       throws Exception {
600                     closer.eventuallyClose(closeable1, closingExecutor);
601                     closer.eventuallyClose(closeable2, closingExecutor);
602                     return ClosingFuture.eventuallyClosing(
603                         immediateFuture(closeable3), closingExecutor);
604                   }
605                 },
606                 executor)
607             .transform(
608                 new ClosingFunction<TestCloseable, String>() {
609                   @Override
610                   public String apply(DeferredCloser closer, TestCloseable v) throws Exception {
611                     assertThat(v).isSameInstanceAs(closeable3);
612                     assertStillOpen(closeable1, closeable2, closeable3);
613                     return "value";
614                   }
615                 },
616                 executor);
617     assertThat(getFinalValue(closingFuture)).isEqualTo("value");
618     waitUntilClosed(closingFuture);
619     assertClosed(closeable1, closeable2, closeable3);
620   }
621 
testTransformAsync_cancelledPipeline()622   public void testTransformAsync_cancelledPipeline() throws Exception {
623     ClosingFuture<TestCloseable> closingFuture =
624         ClosingFuture.from(immediateFuture("value"))
625             .transformAsync(
626                 waiter.waitFor(
627                     new AsyncClosingFunction<String, TestCloseable>() {
628                       @Override
629                       public ClosingFuture<TestCloseable> apply(DeferredCloser closer, String v)
630                           throws Exception {
631                         awaitUninterruptibly(futureCancelled);
632                         closer.eventuallyClose(closeable1, closingExecutor);
633                         closer.eventuallyClose(closeable2, closingExecutor);
634                         return ClosingFuture.eventuallyClosing(
635                             immediateFuture(closeable3), closingExecutor);
636                       }
637                     }),
638                 executor);
639     waiter.awaitStarted();
640     cancelFinalStepAndWait(closingFuture);
641     // not closed until the function returns
642     assertStillOpen(closeable1, closeable2, closeable3);
643     waiter.awaitReturned();
644     assertClosed(closeable1, closeable2, closeable3);
645   }
646 
testTransformAsync_throws()647   public void testTransformAsync_throws() throws Exception {
648     ClosingFuture<Object> closingFuture =
649         ClosingFuture.from(immediateFuture("value"))
650             .transformAsync(
651                 new AsyncClosingFunction<String, Object>() {
652                   @Override
653                   public ClosingFuture<Object> apply(DeferredCloser closer, String v)
654                       throws Exception {
655                     closer.eventuallyClose(closeable1, closingExecutor);
656                     closer.eventuallyClose(closeable2, closingExecutor);
657                     throw exception;
658                   }
659                 },
660                 executor);
661     assertFinallyFailsWithException(closingFuture);
662     waitUntilClosed(closingFuture);
663     assertClosed(closeable1, closeable2);
664   }
665 
testTransformAsync_failed()666   public void testTransformAsync_failed() throws Exception {
667     ClosingFuture<Object> closingFuture =
668         ClosingFuture.from(immediateFuture("value"))
669             .transformAsync(
670                 new AsyncClosingFunction<String, Object>() {
671                   @Override
672                   public ClosingFuture<Object> apply(DeferredCloser closer, String v)
673                       throws Exception {
674                     closer.eventuallyClose(closeable1, closingExecutor);
675                     closer.eventuallyClose(closeable2, closingExecutor);
676                     return failedClosingFuture();
677                   }
678                 },
679                 executor);
680     assertFinallyFailsWithException(closingFuture);
681     waitUntilClosed(closingFuture);
682     assertClosed(closeable1, closeable2);
683   }
684 
testTransformAsync_withoutCloser()685   public void testTransformAsync_withoutCloser() throws Exception {
686     ClosingFuture<String> closingFuture =
687         ClosingFuture.submit(
688                 new ClosingCallable<TestCloseable>() {
689                   @Override
690                   public TestCloseable call(DeferredCloser closer) throws Exception {
691                     return closer.eventuallyClose(closeable1, closingExecutor);
692                   }
693                 },
694                 executor)
695             .transformAsync(
696                 ClosingFuture.withoutCloser(
697                     new AsyncFunction<TestCloseable, String>() {
698                       @Override
699                       public ListenableFuture<String> apply(TestCloseable v) throws Exception {
700                         assertThat(v).isSameInstanceAs(closeable1);
701                         assertStillOpen(closeable1);
702                         return immediateFuture("value");
703                       }
704                     }),
705                 executor);
706     assertThat(getFinalValue(closingFuture)).isEqualTo("value");
707     waitUntilClosed(closingFuture);
708     assertClosed(closeable1);
709   }
710 
testWhenAllComplete_call()711   public void testWhenAllComplete_call() throws Exception {
712     final ClosingFuture<String> input1 = ClosingFuture.from(immediateFuture("value1"));
713     final ClosingFuture<Object> input2Failed = failedClosingFuture();
714     final ClosingFuture<String> nonInput = ClosingFuture.from(immediateFuture("value3"));
715     final AtomicReference<ClosingFuture.Peeker> capturedPeeker = new AtomicReference<>();
716     ClosingFuture<TestCloseable> closingFuture =
717         ClosingFuture.whenAllComplete(ImmutableList.of(input1, input2Failed))
718             .call(
719                 new CombiningCallable<TestCloseable>() {
720                   @Override
721                   public TestCloseable call(DeferredCloser closer, Peeker peeker) throws Exception {
722                     closer.eventuallyClose(closeable1, closingExecutor);
723                     assertThat(peeker.getDone(input1)).isSameInstanceAs("value1");
724                     try {
725                       peeker.getDone(input2Failed);
726                       fail("Peeker.getDone() should fail for failed inputs");
727                     } catch (ExecutionException expected) {
728                     }
729                     try {
730                       peeker.getDone(nonInput);
731                       fail("Peeker should not be able to peek into non-input ClosingFuture.");
732                     } catch (IllegalArgumentException expected) {
733                     }
734                     capturedPeeker.set(peeker);
735                     return closeable2;
736                   }
737                 },
738                 executor);
739     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
740     waitUntilClosed(closingFuture);
741     assertStillOpen(closeable2);
742     assertClosed(closeable1);
743     assertThrows(IllegalStateException.class, () -> capturedPeeker.get().getDone(input1));
744   }
745 
testWhenAllComplete_call_cancelledPipeline()746   public void testWhenAllComplete_call_cancelledPipeline() throws Exception {
747     ClosingFuture<TestCloseable> closingFuture =
748         ClosingFuture.whenAllComplete(
749                 ImmutableList.of(
750                     ClosingFuture.from(immediateFuture(closeable1)),
751                     ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor)))
752             .call(
753                 waiter.waitFor(
754                     new CombiningCallable<TestCloseable>() {
755                       @Override
756                       public TestCloseable call(DeferredCloser closer, Peeker peeker)
757                           throws Exception {
758                         awaitUninterruptibly(futureCancelled);
759                         closer.eventuallyClose(closeable1, closingExecutor);
760                         return closeable3;
761                       }
762                     }),
763                 executor);
764     waiter.awaitStarted();
765     cancelFinalStepAndWait(closingFuture);
766     waiter.awaitReturned();
767     assertClosed(closeable1, closeable2);
768     assertStillOpen(closeable3);
769   }
770 
testWhenAllComplete_call_throws()771   public void testWhenAllComplete_call_throws() throws Exception {
772     ClosingFuture<Object> closingFuture =
773         ClosingFuture.whenAllComplete(
774                 ImmutableList.of(
775                     ClosingFuture.from(immediateFuture(closeable1)),
776                     ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor)))
777             .call(
778                 new CombiningCallable<Object>() {
779                   @Override
780                   public Object call(DeferredCloser closer, Peeker peeker) throws Exception {
781                     closer.eventuallyClose(closeable3, closingExecutor);
782                     throw exception;
783                   }
784                 },
785                 executor);
786     assertFinallyFailsWithException(closingFuture);
787     waitUntilClosed(closingFuture);
788     assertStillOpen(closeable1);
789     assertClosed(closeable2, closeable3);
790   }
791 
testWhenAllComplete_callAsync()792   public void testWhenAllComplete_callAsync() throws Exception {
793     final ClosingFuture<String> input1 = ClosingFuture.from(immediateFuture("value1"));
794     final ClosingFuture<Object> input2Failed = failedClosingFuture();
795     final ClosingFuture<String> nonInput = ClosingFuture.from(immediateFuture("value3"));
796     final AtomicReference<ClosingFuture.Peeker> capturedPeeker = new AtomicReference<>();
797     ClosingFuture<TestCloseable> closingFuture =
798         ClosingFuture.whenAllComplete(ImmutableList.of(input1, input2Failed))
799             .callAsync(
800                 new AsyncCombiningCallable<TestCloseable>() {
801                   @Override
802                   public ClosingFuture<TestCloseable> call(DeferredCloser closer, Peeker peeker)
803                       throws Exception {
804                     closer.eventuallyClose(closeable1, closingExecutor);
805                     assertThat(peeker.getDone(input1)).isSameInstanceAs("value1");
806                     try {
807                       peeker.getDone(input2Failed);
808                       fail("Peeker should fail for failed inputs");
809                     } catch (ExecutionException expected) {
810                     }
811                     try {
812                       peeker.getDone(nonInput);
813                       fail("Peeker should not be able to peek into non-input ClosingFuture.");
814                     } catch (IllegalArgumentException expected) {
815                     }
816                     capturedPeeker.set(peeker);
817                     return ClosingFuture.eventuallyClosing(
818                         immediateFuture(closeable2), closingExecutor);
819                   }
820                 },
821                 executor);
822     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
823     waitUntilClosed(closingFuture);
824     assertClosed(closeable1, closeable2);
825     assertThrows(IllegalStateException.class, () -> capturedPeeker.get().getDone(input1));
826   }
827 
testWhenAllComplete_callAsync_cancelledPipeline()828   public void testWhenAllComplete_callAsync_cancelledPipeline() throws Exception {
829     ClosingFuture<TestCloseable> closingFuture =
830         ClosingFuture.whenAllComplete(
831                 ImmutableList.of(
832                     ClosingFuture.from(immediateFuture(closeable1)),
833                     ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor)))
834             .callAsync(
835                 waiter.waitFor(
836                     new AsyncCombiningCallable<TestCloseable>() {
837                       @Override
838                       public ClosingFuture<TestCloseable> call(DeferredCloser closer, Peeker peeker)
839                           throws Exception {
840                         awaitUninterruptibly(futureCancelled);
841                         closer.eventuallyClose(closeable1, closingExecutor);
842                         return ClosingFuture.eventuallyClosing(
843                             immediateFuture(closeable3), closingExecutor);
844                       }
845                     }),
846                 executor);
847     waiter.awaitStarted();
848     cancelFinalStepAndWait(closingFuture);
849     waiter.awaitReturned();
850     assertClosed(closeable1, closeable2, closeable3);
851   }
852 
testWhenAllComplete_callAsync_throws()853   public void testWhenAllComplete_callAsync_throws() throws Exception {
854     ClosingFuture<Object> closingFuture =
855         ClosingFuture.whenAllComplete(
856                 ImmutableList.of(
857                     ClosingFuture.from(immediateFuture(closeable1)),
858                     ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor)))
859             .callAsync(
860                 new AsyncCombiningCallable<Object>() {
861                   @Override
862                   public ClosingFuture<Object> call(DeferredCloser closer, Peeker peeker)
863                       throws Exception {
864                     closer.eventuallyClose(closeable3, closingExecutor);
865                     throw exception;
866                   }
867                 },
868                 executor);
869     assertFinallyFailsWithException(closingFuture);
870     waitUntilClosed(closingFuture);
871     assertStillOpen(closeable1);
872     assertClosed(closeable2, closeable3);
873   }
874 
875   // We don't need to test the happy case for SuccessfulCombiner.call(Async) because it's the same
876   // as Combiner.
877 
testWhenAllSucceed_call_failedInput()878   public void testWhenAllSucceed_call_failedInput() throws Exception {
879     assertFinallyFailsWithException(
880         ClosingFuture.whenAllSucceed(
881                 ImmutableList.of(
882                     ClosingFuture.from(immediateFuture("value")), failedClosingFuture()))
883             .call(
884                 new CombiningCallable<Object>() {
885                   @Override
886                   public Object call(DeferredCloser closer, Peeker peeker) throws Exception {
887                     expect.fail();
888                     throw new AssertionError();
889                   }
890                 },
891                 executor));
892   }
893 
testWhenAllSucceed_callAsync_failedInput()894   public void testWhenAllSucceed_callAsync_failedInput() throws Exception {
895     assertFinallyFailsWithException(
896         ClosingFuture.whenAllSucceed(
897                 ImmutableList.of(
898                     ClosingFuture.from(immediateFuture("value")), failedClosingFuture()))
899             .callAsync(
900                 new AsyncCombiningCallable<Object>() {
901                   @Override
902                   public ClosingFuture<Object> call(DeferredCloser closer, Peeker peeker)
903                       throws Exception {
904                     expect.fail();
905                     throw new AssertionError();
906                   }
907                 },
908                 executor));
909   }
910 
testWhenAllSucceed2_call()911   public void testWhenAllSucceed2_call() throws ExecutionException, IOException {
912     ClosingFuture<TestCloseable> closingFuture =
913         ClosingFuture.whenAllSucceed(
914                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
915                 ClosingFuture.from(immediateFuture("value1")))
916             .call(
917                 new ClosingFunction2<TestCloseable, String, TestCloseable>() {
918                   @Override
919                   public TestCloseable apply(DeferredCloser closer, TestCloseable v1, String v2)
920                       throws Exception {
921                     assertThat(v1).isEqualTo(closeable1);
922                     assertThat(v2).isEqualTo("value1");
923                     assertStillOpen(closeable1);
924                     closer.eventuallyClose(closeable2, closingExecutor);
925                     return closeable2;
926                   }
927                 },
928                 executor);
929     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
930     waitUntilClosed(closingFuture);
931     assertClosed(closeable1, closeable2);
932   }
933 
testWhenAllSucceed2_call_failedInput()934   public void testWhenAllSucceed2_call_failedInput() throws ExecutionException, IOException {
935     ClosingFuture<Object> closingFuture =
936         ClosingFuture.whenAllSucceed(
937                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
938                 failedClosingFuture())
939             .call(
940                 new ClosingFunction2<TestCloseable, Object, Object>() {
941                   @Override
942                   public Object apply(DeferredCloser closer, TestCloseable v1, Object v2)
943                       throws Exception {
944                     expect.fail();
945                     throw new AssertionError();
946                   }
947                 },
948                 executor);
949     assertFinallyFailsWithException(closingFuture);
950     waitUntilClosed(closingFuture);
951     assertClosed(closeable1);
952   }
953 
testWhenAllSucceed2_call_cancelledPipeline()954   public void testWhenAllSucceed2_call_cancelledPipeline() throws Exception {
955     ClosingFuture<TestCloseable> closingFuture =
956         ClosingFuture.whenAllSucceed(
957                 ClosingFuture.from(immediateFuture(closeable1)),
958                 ClosingFuture.from(immediateFuture(closeable2)))
959             .call(
960                 waiter.waitFor(
961                     new ClosingFunction2<TestCloseable, TestCloseable, TestCloseable>() {
962                       @Override
963                       public TestCloseable apply(
964                           DeferredCloser closer, TestCloseable v1, TestCloseable v2)
965                           throws Exception {
966                         awaitUninterruptibly(futureCancelled);
967                         closer.eventuallyClose(closeable1, closingExecutor);
968                         closer.eventuallyClose(closeable2, closingExecutor);
969                         return closeable3;
970                       }
971                     }),
972                 executor);
973     waiter.awaitStarted();
974     cancelFinalStepAndWait(closingFuture);
975     // not closed until the function returns
976     assertStillOpen(closeable1, closeable2);
977     waiter.awaitReturned();
978     assertClosed(closeable1, closeable2);
979     assertStillOpen(closeable3);
980   }
981 
testWhenAllSucceed2_call_throws()982   public void testWhenAllSucceed2_call_throws() throws Exception {
983     ClosingFuture<Object> closingFuture =
984         ClosingFuture.whenAllSucceed(
985                 ClosingFuture.from(immediateFuture(closeable1)),
986                 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))
987             .call(
988                 new ClosingFunction2<TestCloseable, TestCloseable, Object>() {
989                   @Override
990                   public Object apply(DeferredCloser closer, TestCloseable v1, TestCloseable v2)
991                       throws Exception {
992                     closer.eventuallyClose(closeable3, closingExecutor);
993                     throw exception;
994                   }
995                 },
996                 executor);
997     assertFinallyFailsWithException(closingFuture);
998     waitUntilClosed(closingFuture);
999     assertStillOpen(closeable1);
1000     assertClosed(closeable2, closeable3);
1001   }
1002 
testWhenAllSucceed2_callAsync()1003   public void testWhenAllSucceed2_callAsync() throws ExecutionException, IOException {
1004     ClosingFuture<TestCloseable> closingFuture =
1005         ClosingFuture.whenAllSucceed(
1006                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
1007                 ClosingFuture.from(immediateFuture("value1")))
1008             .callAsync(
1009                 new AsyncClosingFunction2<TestCloseable, String, TestCloseable>() {
1010                   @Override
1011                   public ClosingFuture<TestCloseable> apply(
1012                       DeferredCloser closer, TestCloseable v1, String v2) throws Exception {
1013                     assertThat(v1).isEqualTo(closeable1);
1014                     assertThat(v2).isEqualTo("value1");
1015                     assertStillOpen(closeable1);
1016                     closer.eventuallyClose(closeable2, closingExecutor);
1017                     return ClosingFuture.eventuallyClosing(
1018                         immediateFuture(closeable3), closingExecutor);
1019                   }
1020                 },
1021                 executor);
1022     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable3);
1023     waitUntilClosed(closingFuture);
1024     assertClosed(closeable1, closeable2, closeable3);
1025   }
1026 
testWhenAllSucceed2_callAsync_failedInput()1027   public void testWhenAllSucceed2_callAsync_failedInput() throws ExecutionException, IOException {
1028     ClosingFuture<Object> closingFuture =
1029         ClosingFuture.whenAllSucceed(
1030                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
1031                 failedClosingFuture())
1032             .callAsync(
1033                 new AsyncClosingFunction2<TestCloseable, Object, Object>() {
1034                   @Override
1035                   public ClosingFuture<Object> apply(
1036                       DeferredCloser closer, TestCloseable v1, Object v2) throws Exception {
1037                     expect.fail();
1038                     throw new AssertionError();
1039                   }
1040                 },
1041                 executor);
1042     assertFinallyFailsWithException(closingFuture);
1043     waitUntilClosed(closingFuture);
1044     assertClosed(closeable1);
1045   }
1046 
testWhenAllSucceed2_callAsync_cancelledPipeline()1047   public void testWhenAllSucceed2_callAsync_cancelledPipeline() throws Exception {
1048     ClosingFuture<TestCloseable> closingFuture =
1049         ClosingFuture.whenAllSucceed(
1050                 ClosingFuture.from(immediateFuture(closeable1)),
1051                 ClosingFuture.from(immediateFuture(closeable2)))
1052             .callAsync(
1053                 waiter.waitFor(
1054                     new AsyncClosingFunction2<TestCloseable, TestCloseable, TestCloseable>() {
1055                       @Override
1056                       public ClosingFuture<TestCloseable> apply(
1057                           DeferredCloser closer, TestCloseable v1, TestCloseable v2)
1058                           throws Exception {
1059                         awaitUninterruptibly(futureCancelled);
1060                         closer.eventuallyClose(closeable1, closingExecutor);
1061                         closer.eventuallyClose(closeable2, closingExecutor);
1062                         return ClosingFuture.eventuallyClosing(
1063                             immediateFuture(closeable3), closingExecutor);
1064                       }
1065                     }),
1066                 executor);
1067     waiter.awaitStarted();
1068     cancelFinalStepAndWait(closingFuture);
1069     // not closed until the function returns
1070     assertStillOpen(closeable1, closeable2, closeable3);
1071     waiter.awaitReturned();
1072     assertClosed(closeable1, closeable2, closeable3);
1073   }
1074 
testWhenAllSucceed2_callAsync_throws()1075   public void testWhenAllSucceed2_callAsync_throws() throws Exception {
1076     ClosingFuture<Object> closingFuture =
1077         ClosingFuture.whenAllSucceed(
1078                 ClosingFuture.from(immediateFuture(closeable1)),
1079                 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))
1080             .callAsync(
1081                 new AsyncClosingFunction2<TestCloseable, TestCloseable, Object>() {
1082                   @Override
1083                   public ClosingFuture<Object> apply(
1084                       DeferredCloser closer, TestCloseable v1, TestCloseable v2) throws Exception {
1085                     closer.eventuallyClose(closeable3, closingExecutor);
1086                     throw exception;
1087                   }
1088                 },
1089                 executor);
1090     assertFinallyFailsWithException(closingFuture);
1091     waitUntilClosed(closingFuture);
1092     assertStillOpen(closeable1);
1093     assertClosed(closeable2, closeable3);
1094   }
1095 
testWhenAllSucceed3_call()1096   public void testWhenAllSucceed3_call() throws ExecutionException, IOException {
1097     ClosingFuture<TestCloseable> closingFuture =
1098         ClosingFuture.whenAllSucceed(
1099                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
1100                 ClosingFuture.from(immediateFuture("value2")),
1101                 ClosingFuture.from(immediateFuture("value3")))
1102             .call(
1103                 new ClosingFunction3<TestCloseable, String, String, TestCloseable>() {
1104                   @Override
1105                   public TestCloseable apply(
1106                       DeferredCloser closer, TestCloseable v1, String v2, String v3)
1107                       throws Exception {
1108                     assertThat(v1).isEqualTo(closeable1);
1109                     assertThat(v2).isEqualTo("value2");
1110                     assertThat(v3).isEqualTo("value3");
1111                     assertStillOpen(closeable1);
1112                     closer.eventuallyClose(closeable2, closingExecutor);
1113                     return closeable2;
1114                   }
1115                 },
1116                 executor);
1117     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
1118     waitUntilClosed(closingFuture);
1119     assertClosed(closeable1, closeable2);
1120   }
1121 
testWhenAllSucceed3_call_failedInput()1122   public void testWhenAllSucceed3_call_failedInput() throws ExecutionException, IOException {
1123     ClosingFuture<Object> closingFuture =
1124         ClosingFuture.whenAllSucceed(
1125                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
1126                 failedClosingFuture(),
1127                 ClosingFuture.from(immediateFuture("value3")))
1128             .call(
1129                 new ClosingFunction3<TestCloseable, Object, String, Object>() {
1130                   @Override
1131                   public Object apply(DeferredCloser closer, TestCloseable v1, Object v2, String v3)
1132                       throws Exception {
1133                     expect.fail();
1134                     throw new AssertionError();
1135                   }
1136                 },
1137                 executor);
1138     assertFinallyFailsWithException(closingFuture);
1139     waitUntilClosed(closingFuture);
1140     assertClosed(closeable1);
1141   }
1142 
testWhenAllSucceed3_call_cancelledPipeline()1143   public void testWhenAllSucceed3_call_cancelledPipeline() throws Exception {
1144     ClosingFuture<TestCloseable> closingFuture =
1145         ClosingFuture.whenAllSucceed(
1146                 ClosingFuture.from(immediateFuture(closeable1)),
1147                 ClosingFuture.from(immediateFuture(closeable2)),
1148                 ClosingFuture.from(immediateFuture("value3")))
1149             .call(
1150                 waiter.waitFor(
1151                     new ClosingFunction3<TestCloseable, TestCloseable, String, TestCloseable>() {
1152                       @Override
1153                       public TestCloseable apply(
1154                           DeferredCloser closer, TestCloseable v1, TestCloseable v2, String v3)
1155                           throws Exception {
1156                         awaitUninterruptibly(futureCancelled);
1157                         closer.eventuallyClose(closeable1, closingExecutor);
1158                         closer.eventuallyClose(closeable2, closingExecutor);
1159                         return closeable3;
1160                       }
1161                     }),
1162                 executor);
1163     waiter.awaitStarted();
1164     cancelFinalStepAndWait(closingFuture);
1165     // not closed until the function returns
1166     assertStillOpen(closeable1, closeable2);
1167     waiter.awaitReturned();
1168     assertClosed(closeable1, closeable2);
1169     assertStillOpen(closeable3);
1170   }
1171 
testWhenAllSucceed3_call_throws()1172   public void testWhenAllSucceed3_call_throws() throws Exception {
1173     ClosingFuture<Object> closingFuture =
1174         ClosingFuture.whenAllSucceed(
1175                 ClosingFuture.from(immediateFuture(closeable1)),
1176                 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor),
1177                 ClosingFuture.from(immediateFuture("value3")))
1178             .call(
1179                 new ClosingFunction3<TestCloseable, TestCloseable, String, Object>() {
1180                   @Override
1181                   public Object apply(
1182                       DeferredCloser closer, TestCloseable v1, TestCloseable v2, String v3)
1183                       throws Exception {
1184                     closer.eventuallyClose(closeable3, closingExecutor);
1185                     throw exception;
1186                   }
1187                 },
1188                 executor);
1189     assertFinallyFailsWithException(closingFuture);
1190     waitUntilClosed(closingFuture);
1191     assertStillOpen(closeable1);
1192     assertClosed(closeable2, closeable3);
1193   }
1194 
testWhenAllSucceed4_call()1195   public void testWhenAllSucceed4_call() throws ExecutionException, IOException {
1196     ClosingFuture<TestCloseable> closingFuture =
1197         ClosingFuture.whenAllSucceed(
1198                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
1199                 ClosingFuture.from(immediateFuture("value2")),
1200                 ClosingFuture.from(immediateFuture("value3")),
1201                 ClosingFuture.from(immediateFuture("value4")))
1202             .call(
1203                 new ClosingFunction4<TestCloseable, String, String, String, TestCloseable>() {
1204                   @Override
1205                   public TestCloseable apply(
1206                       DeferredCloser closer, TestCloseable v1, String v2, String v3, String v4)
1207                       throws Exception {
1208                     assertThat(v1).isEqualTo(closeable1);
1209                     assertThat(v2).isEqualTo("value2");
1210                     assertThat(v3).isEqualTo("value3");
1211                     assertThat(v4).isEqualTo("value4");
1212                     assertStillOpen(closeable1);
1213                     closer.eventuallyClose(closeable2, closingExecutor);
1214                     return closeable2;
1215                   }
1216                 },
1217                 executor);
1218     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
1219     waitUntilClosed(closingFuture);
1220     assertClosed(closeable1, closeable2);
1221   }
1222 
testWhenAllSucceed4_call_failedInput()1223   public void testWhenAllSucceed4_call_failedInput() throws ExecutionException, IOException {
1224     ClosingFuture<Object> closingFuture =
1225         ClosingFuture.whenAllSucceed(
1226                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
1227                 failedClosingFuture(),
1228                 ClosingFuture.from(immediateFuture("value3")),
1229                 ClosingFuture.from(immediateFuture("value4")))
1230             .call(
1231                 new ClosingFunction4<TestCloseable, Object, String, String, Object>() {
1232                   @Override
1233                   public Object apply(
1234                       DeferredCloser closer, TestCloseable v1, Object v2, String v3, String v4)
1235                       throws Exception {
1236                     expect.fail();
1237                     throw new AssertionError();
1238                   }
1239                 },
1240                 executor);
1241     assertFinallyFailsWithException(closingFuture);
1242     waitUntilClosed(closingFuture);
1243     assertClosed(closeable1);
1244   }
1245 
testWhenAllSucceed4_call_cancelledPipeline()1246   public void testWhenAllSucceed4_call_cancelledPipeline() throws Exception {
1247     ClosingFuture<TestCloseable> closingFuture =
1248         ClosingFuture.whenAllSucceed(
1249                 ClosingFuture.from(immediateFuture(closeable1)),
1250                 ClosingFuture.from(immediateFuture(closeable2)),
1251                 ClosingFuture.from(immediateFuture("value3")),
1252                 ClosingFuture.from(immediateFuture("value4")))
1253             .call(
1254                 waiter.waitFor(
1255                     new ClosingFunction4<
1256                         TestCloseable, TestCloseable, String, String, TestCloseable>() {
1257                       @Override
1258                       public TestCloseable apply(
1259                           DeferredCloser closer,
1260                           TestCloseable v1,
1261                           TestCloseable v2,
1262                           String v3,
1263                           String v4)
1264                           throws Exception {
1265                         awaitUninterruptibly(futureCancelled);
1266                         closer.eventuallyClose(closeable1, closingExecutor);
1267                         closer.eventuallyClose(closeable2, closingExecutor);
1268                         return closeable3;
1269                       }
1270                     }),
1271                 executor);
1272     waiter.awaitStarted();
1273     cancelFinalStepAndWait(closingFuture);
1274     // not closed until the function returns
1275     assertStillOpen(closeable1, closeable2);
1276     waiter.awaitReturned();
1277     assertClosed(closeable1, closeable2);
1278     assertStillOpen(closeable3);
1279   }
1280 
testWhenAllSucceed4_call_throws()1281   public void testWhenAllSucceed4_call_throws() throws Exception {
1282     ClosingFuture<Object> closingFuture =
1283         ClosingFuture.whenAllSucceed(
1284                 ClosingFuture.from(immediateFuture(closeable1)),
1285                 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor),
1286                 ClosingFuture.from(immediateFuture("value3")),
1287                 ClosingFuture.from(immediateFuture("value4")))
1288             .call(
1289                 new ClosingFunction4<TestCloseable, TestCloseable, String, String, Object>() {
1290                   @Override
1291                   public Object apply(
1292                       DeferredCloser closer,
1293                       TestCloseable v1,
1294                       TestCloseable v2,
1295                       String v3,
1296                       String v4)
1297                       throws Exception {
1298                     closer.eventuallyClose(closeable3, closingExecutor);
1299                     throw exception;
1300                   }
1301                 },
1302                 executor);
1303     assertFinallyFailsWithException(closingFuture);
1304     waitUntilClosed(closingFuture);
1305     assertStillOpen(closeable1);
1306     assertClosed(closeable2, closeable3);
1307   }
1308 
testWhenAllSucceed5_call()1309   public void testWhenAllSucceed5_call() throws ExecutionException, IOException {
1310     ClosingFuture<TestCloseable> closingFuture =
1311         ClosingFuture.whenAllSucceed(
1312                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
1313                 ClosingFuture.from(immediateFuture("value2")),
1314                 ClosingFuture.from(immediateFuture("value3")),
1315                 ClosingFuture.from(immediateFuture("value4")),
1316                 ClosingFuture.from(immediateFuture("value5")))
1317             .call(
1318                 new ClosingFunction5<
1319                     TestCloseable, String, String, String, String, TestCloseable>() {
1320                   @Override
1321                   public TestCloseable apply(
1322                       DeferredCloser closer,
1323                       TestCloseable v1,
1324                       String v2,
1325                       String v3,
1326                       String v4,
1327                       String v5)
1328                       throws Exception {
1329                     assertThat(v1).isEqualTo(closeable1);
1330                     assertThat(v2).isEqualTo("value2");
1331                     assertThat(v3).isEqualTo("value3");
1332                     assertThat(v4).isEqualTo("value4");
1333                     assertThat(v5).isEqualTo("value5");
1334                     assertStillOpen(closeable1);
1335                     closer.eventuallyClose(closeable2, closingExecutor);
1336                     return closeable2;
1337                   }
1338                 },
1339                 executor);
1340     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
1341     waitUntilClosed(closingFuture);
1342     assertClosed(closeable1, closeable2);
1343   }
1344 
testWhenAllSucceed5_call_failedInput()1345   public void testWhenAllSucceed5_call_failedInput() throws ExecutionException, IOException {
1346     ClosingFuture<Object> closingFuture =
1347         ClosingFuture.whenAllSucceed(
1348                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
1349                 failedClosingFuture(),
1350                 ClosingFuture.from(immediateFuture("value3")),
1351                 ClosingFuture.from(immediateFuture("value4")),
1352                 ClosingFuture.from(immediateFuture("value5")))
1353             .call(
1354                 new ClosingFunction5<TestCloseable, Object, String, String, String, Object>() {
1355                   @Override
1356                   public Object apply(
1357                       DeferredCloser closer,
1358                       TestCloseable v1,
1359                       Object v2,
1360                       String v3,
1361                       String v4,
1362                       String v5)
1363                       throws Exception {
1364                     expect.fail();
1365                     throw new AssertionError();
1366                   }
1367                 },
1368                 executor);
1369     assertFinallyFailsWithException(closingFuture);
1370     waitUntilClosed(closingFuture);
1371     assertClosed(closeable1);
1372   }
1373 
testWhenAllSucceed5_call_cancelledPipeline()1374   public void testWhenAllSucceed5_call_cancelledPipeline() throws Exception {
1375     ClosingFuture<TestCloseable> closingFuture =
1376         ClosingFuture.whenAllSucceed(
1377                 ClosingFuture.from(immediateFuture(closeable1)),
1378                 ClosingFuture.from(immediateFuture(closeable2)),
1379                 ClosingFuture.from(immediateFuture("value3")),
1380                 ClosingFuture.from(immediateFuture("value4")),
1381                 ClosingFuture.from(immediateFuture("value5")))
1382             .call(
1383                 waiter.waitFor(
1384                     new ClosingFunction5<
1385                         TestCloseable, TestCloseable, String, String, String, TestCloseable>() {
1386                       @Override
1387                       public TestCloseable apply(
1388                           DeferredCloser closer,
1389                           TestCloseable v1,
1390                           TestCloseable v2,
1391                           String v3,
1392                           String v4,
1393                           String v5)
1394                           throws Exception {
1395                         awaitUninterruptibly(futureCancelled);
1396                         closer.eventuallyClose(closeable1, closingExecutor);
1397                         closer.eventuallyClose(closeable2, closingExecutor);
1398                         return closeable3;
1399                       }
1400                     }),
1401                 executor);
1402     waiter.awaitStarted();
1403     cancelFinalStepAndWait(closingFuture);
1404     // not closed until the function returns
1405     assertStillOpen(closeable1, closeable2);
1406     waiter.awaitReturned();
1407     assertClosed(closeable1, closeable2);
1408     assertStillOpen(closeable3);
1409   }
1410 
testWhenAllSucceed5_call_throws()1411   public void testWhenAllSucceed5_call_throws() throws Exception {
1412     ClosingFuture<Object> closingFuture =
1413         ClosingFuture.whenAllSucceed(
1414                 ClosingFuture.from(immediateFuture(closeable1)),
1415                 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor),
1416                 ClosingFuture.from(immediateFuture("value3")),
1417                 ClosingFuture.from(immediateFuture("value4")),
1418                 ClosingFuture.from(immediateFuture("value5")))
1419             .call(
1420                 new ClosingFunction5<
1421                     TestCloseable, TestCloseable, String, String, String, Object>() {
1422                   @Override
1423                   public Object apply(
1424                       DeferredCloser closer,
1425                       TestCloseable v1,
1426                       TestCloseable v2,
1427                       String v3,
1428                       String v4,
1429                       String v5)
1430                       throws Exception {
1431                     closer.eventuallyClose(closeable3, closingExecutor);
1432                     throw exception;
1433                   }
1434                 },
1435                 executor);
1436     assertFinallyFailsWithException(closingFuture);
1437     waitUntilClosed(closingFuture);
1438     assertStillOpen(closeable1);
1439     assertClosed(closeable2, closeable3);
1440   }
1441 
testTransform_preventsFurtherOperations()1442   public void testTransform_preventsFurtherOperations() {
1443     ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1"));
1444     ClosingFuture<String> unused =
1445         closingFuture.transform(
1446             new ClosingFunction<String, String>() {
1447               @Override
1448               public String apply(DeferredCloser closer, String v) throws Exception {
1449                 return "value2";
1450               }
1451             },
1452             executor);
1453     assertDerivingThrowsIllegalStateException(closingFuture);
1454     assertFinalStepThrowsIllegalStateException(closingFuture);
1455   }
1456 
testTransformAsync_preventsFurtherOperations()1457   public void testTransformAsync_preventsFurtherOperations() {
1458     ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1"));
1459     ClosingFuture<String> unused =
1460         closingFuture.transformAsync(
1461             new AsyncClosingFunction<String, String>() {
1462               @Override
1463               public ClosingFuture<String> apply(DeferredCloser closer, String v) throws Exception {
1464                 return ClosingFuture.from(immediateFuture("value2"));
1465               }
1466             },
1467             executor);
1468     assertDerivingThrowsIllegalStateException(closingFuture);
1469     assertFinalStepThrowsIllegalStateException(closingFuture);
1470   }
1471 
testCatching_preventsFurtherOperations()1472   public void testCatching_preventsFurtherOperations() {
1473     ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1"));
1474     ClosingFuture<String> unused =
1475         closingFuture.catching(
1476             Exception.class,
1477             new ClosingFunction<Exception, String>() {
1478               @Override
1479               public String apply(DeferredCloser closer, Exception x) throws Exception {
1480                 return "value2";
1481               }
1482             },
1483             executor);
1484     assertDerivingThrowsIllegalStateException(closingFuture);
1485     assertFinalStepThrowsIllegalStateException(closingFuture);
1486   }
1487 
testCatchingAsync_preventsFurtherOperations()1488   public void testCatchingAsync_preventsFurtherOperations() {
1489     ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1"));
1490     ClosingFuture<String> unused =
1491         closingFuture.catchingAsync(
1492             Exception.class,
1493             ClosingFuture.withoutCloser(
1494                 new AsyncFunction<Exception, String>() {
1495                   @Override
1496                   public ListenableFuture<String> apply(Exception x) throws Exception {
1497                     return immediateFuture("value2");
1498                   }
1499                 }),
1500             executor);
1501     assertDerivingThrowsIllegalStateException(closingFuture);
1502     assertFinalStepThrowsIllegalStateException(closingFuture);
1503   }
1504 
testWhenAllComplete_preventsFurtherOperations()1505   public void testWhenAllComplete_preventsFurtherOperations() {
1506     ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1"));
1507     Combiner unused = ClosingFuture.whenAllComplete(asList(closingFuture));
1508     assertDerivingThrowsIllegalStateException(closingFuture);
1509     assertFinalStepThrowsIllegalStateException(closingFuture);
1510   }
1511 
testWhenAllSucceed_preventsFurtherOperations()1512   public void testWhenAllSucceed_preventsFurtherOperations() {
1513     ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1"));
1514     Combiner unused = ClosingFuture.whenAllSucceed(asList(closingFuture));
1515     assertDerivingThrowsIllegalStateException(closingFuture);
1516     assertFinalStepThrowsIllegalStateException(closingFuture);
1517   }
1518 
assertDerivingThrowsIllegalStateException( ClosingFuture<String> closingFuture)1519   protected final void assertDerivingThrowsIllegalStateException(
1520       ClosingFuture<String> closingFuture) {
1521     try {
1522       closingFuture.transform(
1523           new ClosingFunction<String, String>() {
1524             @Override
1525             public String apply(DeferredCloser closer3, String v1) throws Exception {
1526               return "value3";
1527             }
1528           },
1529           executor);
1530       fail();
1531     } catch (IllegalStateException expected5) {
1532     }
1533     try {
1534       closingFuture.transformAsync(
1535           new AsyncClosingFunction<String, String>() {
1536             @Override
1537             public ClosingFuture<String> apply(DeferredCloser closer2, String v) throws Exception {
1538               return ClosingFuture.from(immediateFuture("value3"));
1539             }
1540           },
1541           executor);
1542       fail();
1543     } catch (IllegalStateException expected4) {
1544     }
1545     try {
1546       closingFuture.catching(
1547           Exception.class,
1548           new ClosingFunction<Exception, String>() {
1549             @Override
1550             public String apply(DeferredCloser closer1, Exception x1) throws Exception {
1551               return "value3";
1552             }
1553           },
1554           executor);
1555       fail();
1556     } catch (IllegalStateException expected3) {
1557     }
1558     try {
1559       closingFuture.catchingAsync(
1560           Exception.class,
1561           new AsyncClosingFunction<Exception, String>() {
1562             @Override
1563             public ClosingFuture<String> apply(DeferredCloser closer, Exception x)
1564                 throws Exception {
1565               return ClosingFuture.from(immediateFuture("value3"));
1566             }
1567           },
1568           executor);
1569       fail();
1570     } catch (IllegalStateException expected2) {
1571     }
1572     try {
1573       ClosingFuture.whenAllComplete(asList(closingFuture));
1574       fail();
1575     } catch (IllegalStateException expected1) {
1576     }
1577     try {
1578       ClosingFuture.whenAllSucceed(asList(closingFuture));
1579       fail();
1580     } catch (IllegalStateException expected) {
1581     }
1582   }
1583 
1584   /** Asserts that marking this step a final step throws {@link IllegalStateException}. */
assertFinalStepThrowsIllegalStateException(ClosingFuture<?> closingFuture)1585   protected void assertFinalStepThrowsIllegalStateException(ClosingFuture<?> closingFuture) {
1586     try {
1587       closingFuture.finishToFuture();
1588       fail();
1589     } catch (IllegalStateException expected) {
1590     }
1591     try {
1592       closingFuture.finishToValueAndCloser(new NoOpValueAndCloserConsumer<>(), executor);
1593       fail();
1594     } catch (IllegalStateException expected) {
1595     }
1596   }
1597 
1598   // Avoid infinite recursion if a closeable's close() method throws RejectedExecutionException and
1599   // is closed using the direct executor.
testCloseThrowsRejectedExecutionException()1600   public void testCloseThrowsRejectedExecutionException() throws Exception {
1601     doThrow(new RejectedExecutionException()).when(mockCloseable).close();
1602     ClosingFuture<Closeable> closingFuture =
1603         ClosingFuture.submit(
1604             new ClosingCallable<Closeable>() {
1605               @Override
1606               public Closeable call(DeferredCloser closer) throws Exception {
1607                 return closer.eventuallyClose(mockCloseable, directExecutor());
1608               }
1609             },
1610             executor);
1611     assertThat(getFinalValue(closingFuture)).isEqualTo(mockCloseable);
1612     waitUntilClosed(closingFuture);
1613     verify(mockCloseable, timeout(1000)).close();
1614   }
1615 
1616   /**
1617    * Marks the given step final, waits for it to be finished, and returns the value.
1618    *
1619    * @throws ExecutionException if the step failed
1620    * @throws CancellationException if the step was cancelled
1621    */
getFinalValue(ClosingFuture<T> closingFuture)1622   abstract <T> T getFinalValue(ClosingFuture<T> closingFuture) throws ExecutionException;
1623 
1624   /** Marks the given step final, cancels it, and waits for the cancellation to happen. */
cancelFinalStepAndWait(ClosingFuture<TestCloseable> closingFuture)1625   abstract void cancelFinalStepAndWait(ClosingFuture<TestCloseable> closingFuture);
1626 
1627   /**
1628    * Marks the given step final and waits for it to fail. Expects the failure exception to match
1629    * {@link AbstractClosingFutureTest#exception}.
1630    */
assertFinallyFailsWithException(ClosingFuture<?> closingFuture)1631   abstract void assertFinallyFailsWithException(ClosingFuture<?> closingFuture);
1632 
1633   /** Waits for the given step to be canceled. */
assertBecomesCanceled(ClosingFuture<?> closingFuture)1634   abstract void assertBecomesCanceled(ClosingFuture<?> closingFuture) throws ExecutionException;
1635 
1636   /** Waits for the given step's closeables to be closed. */
waitUntilClosed(ClosingFuture<?> closingFuture)1637   void waitUntilClosed(ClosingFuture<?> closingFuture) {
1638     assertTrue(awaitUninterruptibly(closingFuture.whenClosedCountDown(), 1, SECONDS));
1639   }
1640 
assertThatFutureFailsWithException(Future<?> future)1641   void assertThatFutureFailsWithException(Future<?> future) {
1642     try {
1643       getUninterruptibly(future);
1644       fail("Expected future to fail: " + future);
1645     } catch (ExecutionException e) {
1646       assertThat(e).hasCauseThat().isSameInstanceAs(exception);
1647     }
1648   }
1649 
assertThatFutureBecomesCancelled(Future<?> future)1650   static void assertThatFutureBecomesCancelled(Future<?> future) throws ExecutionException {
1651     try {
1652       getUninterruptibly(future);
1653       fail("Expected future to be canceled: " + future);
1654     } catch (CancellationException expected) {
1655     }
1656   }
1657 
assertStillOpen(TestCloseable closeable1, TestCloseable... moreCloseables)1658   private static void assertStillOpen(TestCloseable closeable1, TestCloseable... moreCloseables)
1659       throws IOException {
1660     for (TestCloseable closeable : asList(closeable1, moreCloseables)) {
1661       assertWithMessage("%s.stillOpen()", closeable).that(closeable.stillOpen()).isTrue();
1662     }
1663   }
1664 
assertClosed(TestCloseable closeable1, TestCloseable... moreCloseables)1665   static void assertClosed(TestCloseable closeable1, TestCloseable... moreCloseables)
1666       throws IOException {
1667     for (TestCloseable closeable : asList(closeable1, moreCloseables)) {
1668       assertWithMessage("%s.isClosed()", closeable).that(closeable.awaitClosed()).isTrue();
1669     }
1670   }
1671 
failedClosingFuture()1672   private ClosingFuture<Object> failedClosingFuture() {
1673     return ClosingFuture.from(immediateFailedFuture(exception));
1674   }
1675 
assertNoExpectedFailures()1676   private void assertNoExpectedFailures() {
1677     assertWithMessage("executor was shut down")
1678         .that(shutdownAndAwaitTermination(executor, 10, SECONDS))
1679         .isTrue();
1680     assertWithMessage("closingExecutor was shut down")
1681         .that(shutdownAndAwaitTermination(closingExecutor, 10, SECONDS))
1682         .isTrue();
1683     if (!failures.isEmpty()) {
1684       StringWriter message = new StringWriter();
1685       PrintWriter writer = new PrintWriter(message);
1686       writer.println("Expected no failures, but found:");
1687       for (AssertionError failure : failures) {
1688         failure.printStackTrace(writer);
1689       }
1690       failures.clear();
1691       assertWithMessage(message.toString()).fail();
1692     }
1693   }
1694 
1695   static final class TestCloseable implements Closeable {
1696     private final CountDownLatch latch = new CountDownLatch(1);
1697     private final String name;
1698 
TestCloseable(String name)1699     TestCloseable(String name) {
1700       this.name = name;
1701     }
1702 
1703     @Override
close()1704     public void close() throws IOException {
1705       latch.countDown();
1706     }
1707 
awaitClosed()1708     boolean awaitClosed() {
1709       return awaitUninterruptibly(latch, 10, SECONDS);
1710     }
1711 
stillOpen()1712     boolean stillOpen() {
1713       return !awaitUninterruptibly(latch, 1, SECONDS);
1714     }
1715 
1716     @Override
toString()1717     public String toString() {
1718       return name;
1719     }
1720   }
1721 
1722   static final class Waiter {
1723     private final CountDownLatch started = new CountDownLatch(1);
1724     private final CountDownLatch canReturn = new CountDownLatch(1);
1725     private final CountDownLatch returned = new CountDownLatch(1);
1726     private Object proxy;
1727 
waitFor(Callable<V> callable)1728     <V> Callable<V> waitFor(Callable<V> callable) {
1729       return waitFor(callable, Callable.class);
1730     }
1731 
waitFor(ClosingCallable<V> closingCallable)1732     <V> ClosingCallable<V> waitFor(ClosingCallable<V> closingCallable) {
1733       return waitFor(closingCallable, ClosingCallable.class);
1734     }
1735 
waitFor(AsyncClosingCallable<V> asyncClosingCallable)1736     <V> AsyncClosingCallable<V> waitFor(AsyncClosingCallable<V> asyncClosingCallable) {
1737       return waitFor(asyncClosingCallable, AsyncClosingCallable.class);
1738     }
1739 
waitFor(ClosingFunction<T, U> closingFunction)1740     <T, U> ClosingFunction<T, U> waitFor(ClosingFunction<T, U> closingFunction) {
1741       return waitFor(closingFunction, ClosingFunction.class);
1742     }
1743 
waitFor(AsyncClosingFunction<T, U> asyncClosingFunction)1744     <T, U> AsyncClosingFunction<T, U> waitFor(AsyncClosingFunction<T, U> asyncClosingFunction) {
1745       return waitFor(asyncClosingFunction, AsyncClosingFunction.class);
1746     }
1747 
waitFor(CombiningCallable<V> combiningCallable)1748     <V> CombiningCallable<V> waitFor(CombiningCallable<V> combiningCallable) {
1749       return waitFor(combiningCallable, CombiningCallable.class);
1750     }
1751 
waitFor(AsyncCombiningCallable<V> asyncCombiningCallable)1752     <V> AsyncCombiningCallable<V> waitFor(AsyncCombiningCallable<V> asyncCombiningCallable) {
1753       return waitFor(asyncCombiningCallable, AsyncCombiningCallable.class);
1754     }
1755 
waitFor(ClosingFunction2<V1, V2, U> closingFunction2)1756     <V1, V2, U> ClosingFunction2<V1, V2, U> waitFor(ClosingFunction2<V1, V2, U> closingFunction2) {
1757       return waitFor(closingFunction2, ClosingFunction2.class);
1758     }
1759 
waitFor( AsyncClosingFunction2<V1, V2, U> asyncClosingFunction2)1760     <V1, V2, U> AsyncClosingFunction2<V1, V2, U> waitFor(
1761         AsyncClosingFunction2<V1, V2, U> asyncClosingFunction2) {
1762       return waitFor(asyncClosingFunction2, AsyncClosingFunction2.class);
1763     }
1764 
waitFor( ClosingFunction3<V1, V2, V3, U> closingFunction3)1765     <V1, V2, V3, U> ClosingFunction3<V1, V2, V3, U> waitFor(
1766         ClosingFunction3<V1, V2, V3, U> closingFunction3) {
1767       return waitFor(closingFunction3, ClosingFunction3.class);
1768     }
1769 
waitFor( ClosingFunction4<V1, V2, V3, V4, U> closingFunction4)1770     <V1, V2, V3, V4, U> ClosingFunction4<V1, V2, V3, V4, U> waitFor(
1771         ClosingFunction4<V1, V2, V3, V4, U> closingFunction4) {
1772       return waitFor(closingFunction4, ClosingFunction4.class);
1773     }
1774 
waitFor( ClosingFunction5<V1, V2, V3, V4, V5, U> closingFunction5)1775     <V1, V2, V3, V4, V5, U> ClosingFunction5<V1, V2, V3, V4, V5, U> waitFor(
1776         ClosingFunction5<V1, V2, V3, V4, V5, U> closingFunction5) {
1777       return waitFor(closingFunction5, ClosingFunction5.class);
1778     }
1779 
waitFor(final T delegate, final Class<T> type)1780     <T> T waitFor(final T delegate, final Class<T> type) {
1781       checkState(proxy == null);
1782       T proxyObject =
1783           Reflection.newProxy(
1784               type,
1785               new InvocationHandler() {
1786                 @Override
1787                 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
1788                   if (!method.getDeclaringClass().equals(type)) {
1789                     return method.invoke(delegate, args);
1790                   }
1791                   checkState(started.getCount() == 1);
1792                   started.countDown();
1793                   try {
1794                     return method.invoke(delegate, args);
1795                   } catch (InvocationTargetException e) {
1796                     throw e.getCause();
1797                   } finally {
1798                     awaitUninterruptibly(canReturn);
1799                     returned.countDown();
1800                   }
1801                 }
1802               });
1803       this.proxy = proxyObject;
1804       return proxyObject;
1805     }
1806 
awaitStarted()1807     void awaitStarted() {
1808       assertTrue(awaitUninterruptibly(started, 10, SECONDS));
1809     }
1810 
awaitReturned()1811     void awaitReturned() {
1812       canReturn.countDown();
1813       assertTrue(awaitUninterruptibly(returned, 10, SECONDS));
1814     }
1815   }
1816 
1817   static final class NoOpValueAndCloserConsumer<V> implements ValueAndCloserConsumer<V> {
1818     @Override
accept(ValueAndCloser<V> valueAndCloser)1819     public void accept(ValueAndCloser<V> valueAndCloser) {}
1820   }
1821 }
1822