• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.base.Preconditions.checkState;
20 import static com.google.common.collect.Lists.asList;
21 import static com.google.common.truth.Truth.assertThat;
22 import static com.google.common.truth.Truth.assertWithMessage;
23 import static com.google.common.util.concurrent.Futures.immediateCancelledFuture;
24 import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
25 import static com.google.common.util.concurrent.Futures.immediateFuture;
26 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
27 import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination;
28 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
29 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
30 import static java.util.Arrays.asList;
31 import static java.util.concurrent.Executors.newSingleThreadExecutor;
32 import static java.util.concurrent.TimeUnit.SECONDS;
33 import static org.junit.Assert.assertThrows;
34 import static org.mockito.Mockito.doThrow;
35 import static org.mockito.Mockito.timeout;
36 import static org.mockito.Mockito.verify;
37 
38 import com.google.common.collect.ImmutableList;
39 import com.google.common.reflect.Reflection;
40 import com.google.common.truth.FailureStrategy;
41 import com.google.common.truth.StandardSubjectBuilder;
42 import com.google.common.util.concurrent.ClosingFuture.AsyncClosingCallable;
43 import com.google.common.util.concurrent.ClosingFuture.AsyncClosingFunction;
44 import com.google.common.util.concurrent.ClosingFuture.ClosingCallable;
45 import com.google.common.util.concurrent.ClosingFuture.ClosingFunction;
46 import com.google.common.util.concurrent.ClosingFuture.Combiner;
47 import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable;
48 import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable;
49 import com.google.common.util.concurrent.ClosingFuture.Combiner2.AsyncClosingFunction2;
50 import com.google.common.util.concurrent.ClosingFuture.Combiner2.ClosingFunction2;
51 import com.google.common.util.concurrent.ClosingFuture.Combiner3.ClosingFunction3;
52 import com.google.common.util.concurrent.ClosingFuture.Combiner4.ClosingFunction4;
53 import com.google.common.util.concurrent.ClosingFuture.Combiner5.ClosingFunction5;
54 import com.google.common.util.concurrent.ClosingFuture.DeferredCloser;
55 import com.google.common.util.concurrent.ClosingFuture.Peeker;
56 import com.google.common.util.concurrent.ClosingFuture.ValueAndCloser;
57 import com.google.common.util.concurrent.ClosingFuture.ValueAndCloserConsumer;
58 import java.io.Closeable;
59 import java.io.IOException;
60 import java.io.PrintWriter;
61 import java.io.StringWriter;
62 import java.lang.reflect.InvocationHandler;
63 import java.lang.reflect.InvocationTargetException;
64 import java.lang.reflect.Method;
65 import java.util.ArrayList;
66 import java.util.List;
67 import java.util.concurrent.Callable;
68 import java.util.concurrent.CancellationException;
69 import java.util.concurrent.CountDownLatch;
70 import java.util.concurrent.ExecutionException;
71 import java.util.concurrent.Executor;
72 import java.util.concurrent.ExecutorService;
73 import java.util.concurrent.Future;
74 import java.util.concurrent.RejectedExecutionException;
75 import java.util.concurrent.atomic.AtomicReference;
76 import junit.framework.TestCase;
77 import org.mockito.Mockito;
78 
79 /**
80  * Tests for {@link ClosingFuture}. Subclasses exercise either the {@link
81  * ClosingFuture#finishToFuture()} or {@link
82  * ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} paths to complete a
83  * {@link ClosingFuture} pipeline.
84  */
85 public abstract class AbstractClosingFutureTest extends TestCase {
86   // TODO(dpb): Use Expect once that supports JUnit 3, or we can use JUnit 4.
87   final List<AssertionError> failures = new ArrayList<>();
88   final StandardSubjectBuilder expect =
89       StandardSubjectBuilder.forCustomFailureStrategy(
90           new FailureStrategy() {
91             @Override
92             public void fail(AssertionError failure) {
93               failures.add(failure);
94             }
95           });
96 
97   final ListeningExecutorService executor =
98       MoreExecutors.listeningDecorator(newSingleThreadExecutor());
99   final ExecutorService closingExecutor = newSingleThreadExecutor();
100 
101   final TestCloseable closeable1 = new TestCloseable("closeable1");
102   final TestCloseable closeable2 = new TestCloseable("closeable2");
103   final TestCloseable closeable3 = new TestCloseable("closeable3");
104   final TestCloseable closeable4 = new TestCloseable("closeable4");
105 
106   final Waiter waiter = new Waiter();
107   final CountDownLatch futureCancelled = new CountDownLatch(1);
108   final Exception exception = new Exception();
109   final Closeable mockCloseable = Mockito.mock(Closeable.class);
110 
111   @Override
tearDown()112   protected void tearDown() throws Exception {
113     assertNoExpectedFailures();
114     super.tearDown();
115   }
116 
testFrom()117   public void testFrom() throws Exception {
118     ClosingFuture<String> closingFuture =
119         ClosingFuture.from(executor.submit(Callables.returning(closeable1)))
120             .transform(
121                 new ClosingFunction<TestCloseable, String>() {
122                   @Override
123                   public String apply(DeferredCloser closer, TestCloseable v) throws Exception {
124                     assertThat(v).isSameInstanceAs(closeable1);
125                     return "value";
126                   }
127                 },
128                 executor);
129     assertThat(getFinalValue(closingFuture)).isEqualTo("value");
130     waitUntilClosed(closingFuture);
131     assertStillOpen(closeable1);
132   }
133 
testFrom_failedInput()134   public void testFrom_failedInput() throws Exception {
135     assertFinallyFailsWithException(failedClosingFuture());
136   }
137 
testFrom_cancelledInput()138   public void testFrom_cancelledInput() throws Exception {
139     assertBecomesCanceled(ClosingFuture.from(immediateCancelledFuture()));
140   }
141 
testEventuallyClosing()142   public void testEventuallyClosing() throws Exception {
143     ClosingFuture<String> closingFuture =
144         ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor)
145             .transform(
146                 new ClosingFunction<TestCloseable, String>() {
147                   @Override
148                   public String apply(DeferredCloser closer, TestCloseable v) throws Exception {
149                     assertThat(v).isSameInstanceAs(closeable1);
150                     assertStillOpen(closeable1);
151                     return "value";
152                   }
153                 },
154                 executor);
155     assertThat(getFinalValue(closingFuture)).isEqualTo("value");
156     waitUntilClosed(closingFuture);
157     assertClosed(closeable1);
158   }
159 
testEventuallyClosing_failedInput()160   public void testEventuallyClosing_failedInput() throws Exception {
161     assertFinallyFailsWithException(
162         ClosingFuture.eventuallyClosing(
163             Futures.<Closeable>immediateFailedFuture(exception), closingExecutor));
164   }
165 
testEventuallyClosing_cancelledInput()166   public void testEventuallyClosing_cancelledInput() throws Exception {
167     assertBecomesCanceled(
168         ClosingFuture.eventuallyClosing(
169             Futures.<Closeable>immediateCancelledFuture(), closingExecutor));
170   }
171 
testEventuallyClosing_cancelledPipeline()172   public void testEventuallyClosing_cancelledPipeline() throws Exception {
173     ClosingFuture<TestCloseable> closingFuture =
174         ClosingFuture.eventuallyClosing(
175             executor.submit(
176                 waiter.waitFor(
177                     new Callable<TestCloseable>() {
178                       @Override
179                       public TestCloseable call() throws InterruptedException {
180                         awaitUninterruptibly(futureCancelled);
181                         return closeable1;
182                       }
183                     })),
184             closingExecutor);
185     waiter.awaitStarted();
186     cancelFinalStepAndWait(closingFuture);
187     // not closed until the callable returns
188     assertStillOpen(closeable1);
189     waiter.awaitReturned();
190     assertClosed(closeable1);
191   }
192 
testEventuallyClosing_throws()193   public void testEventuallyClosing_throws() throws Exception {
194     assertFinallyFailsWithException(
195         ClosingFuture.eventuallyClosing(
196             executor.submit(
197                 new Callable<TestCloseable>() {
198                   @Override
199                   public TestCloseable call() throws Exception {
200                     throw exception;
201                   }
202                 }),
203             closingExecutor));
204   }
205 
testSubmit()206   public void testSubmit() throws Exception {
207     ClosingFuture<String> closingFuture =
208         ClosingFuture.submit(
209                 new ClosingCallable<TestCloseable>() {
210                   @Override
211                   public TestCloseable call(DeferredCloser closer) throws Exception {
212                     closer.eventuallyClose(closeable1, closingExecutor);
213                     closer.eventuallyClose(closeable2, closingExecutor);
214                     return closeable3;
215                   }
216                 },
217                 executor)
218             .transform(
219                 new ClosingFunction<TestCloseable, String>() {
220                   @Override
221                   public String apply(DeferredCloser closer, TestCloseable v) throws Exception {
222                     assertThat(v).isSameInstanceAs(closeable3);
223                     assertStillOpen(closeable1, closeable2, closeable3);
224                     return "value";
225                   }
226                 },
227                 executor);
228     assertThat(getFinalValue(closingFuture)).isEqualTo("value");
229     waitUntilClosed(closingFuture);
230     assertClosed(closeable1, closeable2);
231     assertStillOpen(closeable3);
232   }
233 
testSubmit_cancelledPipeline()234   public void testSubmit_cancelledPipeline() throws Exception {
235     ClosingFuture<TestCloseable> closingFuture =
236         ClosingFuture.submit(
237             waiter.waitFor(
238                 new ClosingCallable<TestCloseable>() {
239                   @Override
240                   public TestCloseable call(DeferredCloser closer) throws Exception {
241                     awaitUninterruptibly(futureCancelled);
242                     closer.eventuallyClose(closeable1, closingExecutor);
243                     closer.eventuallyClose(closeable2, closingExecutor);
244                     return closeable3;
245                   }
246                 }),
247             executor);
248     waiter.awaitStarted();
249     cancelFinalStepAndWait(closingFuture);
250     waiter.awaitReturned();
251     assertClosed(closeable1, closeable2);
252     assertStillOpen(closeable3);
253   }
254 
testSubmit_throws()255   public void testSubmit_throws() throws Exception {
256     ClosingFuture<Object> closingFuture =
257         ClosingFuture.submit(
258             new ClosingCallable<Object>() {
259               @Override
260               public Object call(DeferredCloser closer) throws Exception {
261                 closer.eventuallyClose(closeable1, closingExecutor);
262                 closer.eventuallyClose(closeable2, closingExecutor);
263                 throw exception;
264               }
265             },
266             executor);
267     assertFinallyFailsWithException(closingFuture);
268     waitUntilClosed(closingFuture);
269     assertClosed(closeable1, closeable2);
270   }
271 
testSubmitAsync()272   public void testSubmitAsync() throws Exception {
273     ClosingFuture<TestCloseable> closingFuture =
274         ClosingFuture.submitAsync(
275             new AsyncClosingCallable<TestCloseable>() {
276               @Override
277               public ClosingFuture<TestCloseable> call(DeferredCloser closer) {
278                 closer.eventuallyClose(closeable1, closingExecutor);
279                 return ClosingFuture.submit(
280                     new ClosingCallable<TestCloseable>() {
281                       @Override
282                       public TestCloseable call(DeferredCloser deferredCloser) throws Exception {
283                         return closeable2;
284                       }
285                     },
286                     directExecutor());
287               }
288             },
289             executor);
290     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
291     waitUntilClosed(closingFuture);
292     assertClosed(closeable1);
293     assertStillOpen(closeable2);
294   }
295 
testSubmitAsync_cancelledPipeline()296   public void testSubmitAsync_cancelledPipeline() throws Exception {
297     ClosingFuture<TestCloseable> closingFuture =
298         ClosingFuture.submitAsync(
299             waiter.waitFor(
300                 new AsyncClosingCallable<TestCloseable>() {
301                   @Override
302                   public ClosingFuture<TestCloseable> call(DeferredCloser closer) throws Exception {
303                     awaitUninterruptibly(futureCancelled);
304                     closer.eventuallyClose(closeable1, closingExecutor);
305                     closer.eventuallyClose(closeable2, closingExecutor);
306                     return ClosingFuture.submit(
307                         new ClosingCallable<TestCloseable>() {
308                           @Override
309                           public TestCloseable call(DeferredCloser deferredCloser)
310                               throws Exception {
311                             deferredCloser.eventuallyClose(closeable3, closingExecutor);
312                             return closeable3;
313                           }
314                         },
315                         directExecutor());
316                   }
317                 }),
318             executor);
319     waiter.awaitStarted();
320     cancelFinalStepAndWait(closingFuture);
321     waiter.awaitReturned();
322     assertClosed(closeable1, closeable2, closeable3);
323   }
324 
testSubmitAsync_throws()325   public void testSubmitAsync_throws() throws Exception {
326     ClosingFuture<Object> closingFuture =
327         ClosingFuture.submitAsync(
328             new AsyncClosingCallable<Object>() {
329               @Override
330               public ClosingFuture<Object> call(DeferredCloser closer) throws Exception {
331                 closer.eventuallyClose(closeable1, closingExecutor);
332                 closer.eventuallyClose(closeable2, closingExecutor);
333                 throw exception;
334               }
335             },
336             executor);
337     assertFinallyFailsWithException(closingFuture);
338     waitUntilClosed(closingFuture);
339     assertClosed(closeable1, closeable2);
340   }
341 
testStatusFuture()342   public void testStatusFuture() throws Exception {
343     ClosingFuture<String> closingFuture =
344         ClosingFuture.submit(
345             waiter.waitFor(
346                 new ClosingCallable<String>() {
347                   @Override
348                   public String call(DeferredCloser closer) throws Exception {
349                     return "value";
350                   }
351                 }),
352             executor);
353     ListenableFuture<?> statusFuture = closingFuture.statusFuture();
354     waiter.awaitStarted();
355     assertThat(statusFuture.isDone()).isFalse();
356     waiter.awaitReturned();
357     assertThat(getUninterruptibly(statusFuture)).isNull();
358   }
359 
testStatusFuture_failure()360   public void testStatusFuture_failure() throws Exception {
361     ClosingFuture<String> closingFuture =
362         ClosingFuture.submit(
363             waiter.waitFor(
364                 new ClosingCallable<String>() {
365                   @Override
366                   public String call(DeferredCloser closer) throws Exception {
367                     throw exception;
368                   }
369                 }),
370             executor);
371     ListenableFuture<?> statusFuture = closingFuture.statusFuture();
372     waiter.awaitStarted();
373     assertThat(statusFuture.isDone()).isFalse();
374     waiter.awaitReturned();
375     assertThatFutureFailsWithException(statusFuture);
376   }
377 
testStatusFuture_cancelDoesNothing()378   public void testStatusFuture_cancelDoesNothing() throws Exception {
379     ClosingFuture<String> closingFuture =
380         ClosingFuture.submit(
381             waiter.waitFor(
382                 new ClosingCallable<String>() {
383                   @Override
384                   public String call(DeferredCloser closer) throws Exception {
385                     return "value";
386                   }
387                 }),
388             executor);
389     ListenableFuture<?> statusFuture = closingFuture.statusFuture();
390     waiter.awaitStarted();
391     assertThat(statusFuture.isDone()).isFalse();
392     statusFuture.cancel(true);
393     assertThat(statusFuture.isCancelled()).isTrue();
394     waiter.awaitReturned();
395     assertThat(getFinalValue(closingFuture)).isEqualTo("value");
396   }
397 
testCancel_caught()398   public void testCancel_caught() throws Exception {
399     ClosingFuture<String> step0 = ClosingFuture.from(immediateFuture("value 0"));
400     ClosingFuture<String> step1 =
401         step0.transform(
402             new ClosingFunction<String, String>() {
403               @Override
404               public String apply(DeferredCloser closer, String v) throws Exception {
405                 closer.eventuallyClose(closeable1, closingExecutor);
406                 return "value 1";
407               }
408             },
409             executor);
410     Waiter step2Waiter = new Waiter();
411     ClosingFuture<String> step2 =
412         step1.transform(
413             step2Waiter.waitFor(
414                 new ClosingFunction<String, String>() {
415                   @Override
416                   public String apply(DeferredCloser closer, String v) throws Exception {
417                     closer.eventuallyClose(closeable2, closingExecutor);
418                     return "value 2";
419                   }
420                 }),
421             executor);
422     ClosingFuture<String> step3 =
423         step2.transform(
424             new ClosingFunction<String, String>() {
425               @Override
426               public String apply(DeferredCloser closer, String input) throws Exception {
427                 closer.eventuallyClose(closeable3, closingExecutor);
428                 return "value 3";
429               }
430             },
431             executor);
432     Waiter step4Waiter = new Waiter();
433     ClosingFuture<String> step4 =
434         step3.catching(
435             CancellationException.class,
436             step4Waiter.waitFor(
437                 new ClosingFunction<CancellationException, String>() {
438                   @Override
439                   public String apply(DeferredCloser closer, CancellationException input)
440                       throws Exception {
441                     closer.eventuallyClose(closeable4, closingExecutor);
442                     return "value 4";
443                   }
444                 }),
445             executor);
446 
447     // Pause in step 2.
448     step2Waiter.awaitStarted();
449 
450     // Everything should still be open.
451     assertStillOpen(closeable1, closeable2, closeable3, closeable4);
452 
453     // Cancel step 3, resume step 2, and pause in step 4.
454     assertWithMessage("step3.cancel()").that(step3.cancel(false)).isTrue();
455     step2Waiter.awaitReturned();
456     step4Waiter.awaitStarted();
457 
458     // Step 1 is not cancelled because it was done.
459     assertWithMessage("step1.statusFuture().isCancelled()")
460         .that(step1.statusFuture().isCancelled())
461         .isFalse();
462     // But its closeable is closed.
463     assertClosed(closeable1);
464 
465     // Step 2 is cancelled because it wasn't complete.
466     assertWithMessage("step2.statusFuture().isCancelled()")
467         .that(step2.statusFuture().isCancelled())
468         .isTrue();
469     // Its closeable is closed.
470     assertClosed(closeable2);
471 
472     // Step 3 was cancelled before it began
473     assertWithMessage("step3.statusFuture().isCancelled()")
474         .that(step3.statusFuture().isCancelled())
475         .isTrue();
476     // Its closeable is still open.
477     assertStillOpen(closeable3);
478 
479     // Step 4 is not cancelled, because it caught the cancellation.
480     assertWithMessage("step4.statusFuture().isCancelled()")
481         .that(step4.statusFuture().isCancelled())
482         .isFalse();
483     // Its closeable isn't closed yet.
484     assertStillOpen(closeable4);
485 
486     // Resume step 4 and complete.
487     step4Waiter.awaitReturned();
488     assertThat(getFinalValue(step4)).isEqualTo("value 4");
489 
490     // Step 4's closeable is now closed.
491     assertClosed(closeable4);
492     // Step 3 still never ran, so its closeable should still be open.
493     assertStillOpen(closeable3);
494   }
495 
testTransform()496   public void testTransform() throws Exception {
497     ClosingFuture<String> closingFuture =
498         ClosingFuture.from(immediateFuture("value"))
499             .transform(
500                 new ClosingFunction<String, TestCloseable>() {
501                   @Override
502                   public TestCloseable apply(DeferredCloser closer, String v) throws Exception {
503                     closer.eventuallyClose(closeable1, closingExecutor);
504                     closer.eventuallyClose(closeable2, closingExecutor);
505                     return closeable3;
506                   }
507                 },
508                 executor)
509             .transform(
510                 new ClosingFunction<TestCloseable, String>() {
511                   @Override
512                   public String apply(DeferredCloser closer, TestCloseable v) throws Exception {
513                     assertThat(v).isSameInstanceAs(closeable3);
514                     assertStillOpen(closeable1, closeable2, closeable3);
515                     return "value";
516                   }
517                 },
518                 executor);
519     assertThat(getFinalValue(closingFuture)).isEqualTo("value");
520     waitUntilClosed(closingFuture);
521     assertClosed(closeable1, closeable2);
522     assertStillOpen(closeable3);
523   }
524 
testTransform_cancelledPipeline()525   public void testTransform_cancelledPipeline() throws Exception {
526     String value = "value";
527     ClosingFuture<TestCloseable> closingFuture =
528         ClosingFuture.from(immediateFuture(value))
529             .transform(
530                 new ClosingFunction<String, TestCloseable>() {
531                   @Override
532                   public TestCloseable apply(DeferredCloser closer, String v) throws Exception {
533                     return closer.eventuallyClose(closeable1, closingExecutor);
534                   }
535                 },
536                 executor)
537             .transform(
538                 waiter.waitFor(
539                     new ClosingFunction<TestCloseable, TestCloseable>() {
540                       @Override
541                       public TestCloseable apply(DeferredCloser closer, TestCloseable v)
542                           throws Exception {
543                         awaitUninterruptibly(futureCancelled);
544                         closer.eventuallyClose(closeable2, closingExecutor);
545                         closer.eventuallyClose(closeable3, closingExecutor);
546                         return closeable4;
547                       }
548                     }),
549                 executor);
550     waiter.awaitStarted();
551     cancelFinalStepAndWait(closingFuture);
552     waiter.awaitReturned();
553     assertClosed(closeable1, closeable2, closeable3);
554     assertStillOpen(closeable4);
555   }
556 
testTransform_throws()557   public void testTransform_throws() throws Exception {
558     ClosingFuture<Object> closingFuture =
559         ClosingFuture.from(immediateFuture("value"))
560             .transform(
561                 new ClosingFunction<String, Object>() {
562                   @Override
563                   public Object apply(DeferredCloser closer, String v) throws Exception {
564                     closer.eventuallyClose(closeable1, closingExecutor);
565                     closer.eventuallyClose(closeable2, closingExecutor);
566                     throw exception;
567                   }
568                 },
569                 executor);
570     assertFinallyFailsWithException(closingFuture);
571     waitUntilClosed(closingFuture);
572     assertClosed(closeable1, closeable2);
573   }
574 
testTransformAsync()575   public void testTransformAsync() throws Exception {
576     ClosingFuture<String> closingFuture =
577         ClosingFuture.from(immediateFuture("value"))
578             .transformAsync(
579                 new AsyncClosingFunction<String, TestCloseable>() {
580                   @Override
581                   public ClosingFuture<TestCloseable> apply(DeferredCloser closer, String v)
582                       throws Exception {
583                     closer.eventuallyClose(closeable1, closingExecutor);
584                     closer.eventuallyClose(closeable2, closingExecutor);
585                     return ClosingFuture.eventuallyClosing(
586                         immediateFuture(closeable3), closingExecutor);
587                   }
588                 },
589                 executor)
590             .transform(
591                 new ClosingFunction<TestCloseable, String>() {
592                   @Override
593                   public String apply(DeferredCloser closer, TestCloseable v) throws Exception {
594                     assertThat(v).isSameInstanceAs(closeable3);
595                     assertStillOpen(closeable1, closeable2, closeable3);
596                     return "value";
597                   }
598                 },
599                 executor);
600     assertThat(getFinalValue(closingFuture)).isEqualTo("value");
601     waitUntilClosed(closingFuture);
602     assertClosed(closeable1, closeable2, closeable3);
603   }
604 
testTransformAsync_cancelledPipeline()605   public void testTransformAsync_cancelledPipeline() throws Exception {
606     ClosingFuture<TestCloseable> closingFuture =
607         ClosingFuture.from(immediateFuture("value"))
608             .transformAsync(
609                 waiter.waitFor(
610                     new AsyncClosingFunction<String, TestCloseable>() {
611                       @Override
612                       public ClosingFuture<TestCloseable> apply(DeferredCloser closer, String v)
613                           throws Exception {
614                         awaitUninterruptibly(futureCancelled);
615                         closer.eventuallyClose(closeable1, closingExecutor);
616                         closer.eventuallyClose(closeable2, closingExecutor);
617                         return ClosingFuture.eventuallyClosing(
618                             immediateFuture(closeable3), closingExecutor);
619                       }
620                     }),
621                 executor);
622     waiter.awaitStarted();
623     cancelFinalStepAndWait(closingFuture);
624     // not closed until the function returns
625     assertStillOpen(closeable1, closeable2, closeable3);
626     waiter.awaitReturned();
627     assertClosed(closeable1, closeable2, closeable3);
628   }
629 
testTransformAsync_throws()630   public void testTransformAsync_throws() throws Exception {
631     ClosingFuture<Object> closingFuture =
632         ClosingFuture.from(immediateFuture("value"))
633             .transformAsync(
634                 new AsyncClosingFunction<String, Object>() {
635                   @Override
636                   public ClosingFuture<Object> apply(DeferredCloser closer, String v)
637                       throws Exception {
638                     closer.eventuallyClose(closeable1, closingExecutor);
639                     closer.eventuallyClose(closeable2, closingExecutor);
640                     throw exception;
641                   }
642                 },
643                 executor);
644     assertFinallyFailsWithException(closingFuture);
645     waitUntilClosed(closingFuture);
646     assertClosed(closeable1, closeable2);
647   }
648 
testTransformAsync_failed()649   public void testTransformAsync_failed() throws Exception {
650     ClosingFuture<Object> closingFuture =
651         ClosingFuture.from(immediateFuture("value"))
652             .transformAsync(
653                 new AsyncClosingFunction<String, Object>() {
654                   @Override
655                   public ClosingFuture<Object> apply(DeferredCloser closer, String v)
656                       throws Exception {
657                     closer.eventuallyClose(closeable1, closingExecutor);
658                     closer.eventuallyClose(closeable2, closingExecutor);
659                     return failedClosingFuture();
660                   }
661                 },
662                 executor);
663     assertFinallyFailsWithException(closingFuture);
664     waitUntilClosed(closingFuture);
665     assertClosed(closeable1, closeable2);
666   }
667 
testTransformAsync_withoutCloser()668   public void testTransformAsync_withoutCloser() throws Exception {
669     ClosingFuture<String> closingFuture =
670         ClosingFuture.submit(
671                 new ClosingCallable<TestCloseable>() {
672                   @Override
673                   public TestCloseable call(DeferredCloser closer) throws Exception {
674                     return closer.eventuallyClose(closeable1, closingExecutor);
675                   }
676                 },
677                 executor)
678             .transformAsync(
679                 ClosingFuture.withoutCloser(
680                     new AsyncFunction<TestCloseable, String>() {
681                       @Override
682                       public ListenableFuture<String> apply(TestCloseable v) throws Exception {
683                         assertThat(v).isSameInstanceAs(closeable1);
684                         assertStillOpen(closeable1);
685                         return immediateFuture("value");
686                       }
687                     }),
688                 executor);
689     assertThat(getFinalValue(closingFuture)).isEqualTo("value");
690     waitUntilClosed(closingFuture);
691     assertClosed(closeable1);
692   }
693 
testWhenAllComplete_call()694   public void testWhenAllComplete_call() throws Exception {
695     final ClosingFuture<String> input1 = ClosingFuture.from(immediateFuture("value1"));
696     final ClosingFuture<Object> input2Failed = failedClosingFuture();
697     final ClosingFuture<String> nonInput = ClosingFuture.from(immediateFuture("value3"));
698     final AtomicReference<ClosingFuture.Peeker> capturedPeeker = new AtomicReference<>();
699     ClosingFuture<TestCloseable> closingFuture =
700         ClosingFuture.whenAllComplete(ImmutableList.of(input1, input2Failed))
701             .call(
702                 new CombiningCallable<TestCloseable>() {
703                   @Override
704                   public TestCloseable call(DeferredCloser closer, Peeker peeker) throws Exception {
705                     closer.eventuallyClose(closeable1, closingExecutor);
706                     assertThat(peeker.getDone(input1)).isSameInstanceAs("value1");
707                     try {
708                       peeker.getDone(input2Failed);
709                       fail("Peeker.getDone() should fail for failed inputs");
710                     } catch (ExecutionException expected) {
711                     }
712                     try {
713                       peeker.getDone(nonInput);
714                       fail("Peeker should not be able to peek into non-input ClosingFuture.");
715                     } catch (IllegalArgumentException expected) {
716                     }
717                     capturedPeeker.set(peeker);
718                     return closeable2;
719                   }
720                 },
721                 executor);
722     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
723     waitUntilClosed(closingFuture);
724     assertStillOpen(closeable2);
725     assertClosed(closeable1);
726     assertThrows(IllegalStateException.class, () -> capturedPeeker.get().getDone(input1));
727   }
728 
testWhenAllComplete_call_cancelledPipeline()729   public void testWhenAllComplete_call_cancelledPipeline() throws Exception {
730     ClosingFuture<TestCloseable> closingFuture =
731         ClosingFuture.whenAllComplete(
732                 ImmutableList.of(
733                     ClosingFuture.from(immediateFuture(closeable1)),
734                     ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor)))
735             .call(
736                 waiter.waitFor(
737                     new CombiningCallable<TestCloseable>() {
738                       @Override
739                       public TestCloseable call(DeferredCloser closer, Peeker peeker)
740                           throws Exception {
741                         awaitUninterruptibly(futureCancelled);
742                         closer.eventuallyClose(closeable1, closingExecutor);
743                         return closeable3;
744                       }
745                     }),
746                 executor);
747     waiter.awaitStarted();
748     cancelFinalStepAndWait(closingFuture);
749     waiter.awaitReturned();
750     assertClosed(closeable1, closeable2);
751     assertStillOpen(closeable3);
752   }
753 
testWhenAllComplete_call_throws()754   public void testWhenAllComplete_call_throws() throws Exception {
755     ClosingFuture<Object> closingFuture =
756         ClosingFuture.whenAllComplete(
757                 ImmutableList.of(
758                     ClosingFuture.from(immediateFuture(closeable1)),
759                     ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor)))
760             .call(
761                 new CombiningCallable<Object>() {
762                   @Override
763                   public Object call(DeferredCloser closer, Peeker peeker) throws Exception {
764                     closer.eventuallyClose(closeable3, closingExecutor);
765                     throw exception;
766                   }
767                 },
768                 executor);
769     assertFinallyFailsWithException(closingFuture);
770     waitUntilClosed(closingFuture);
771     assertStillOpen(closeable1);
772     assertClosed(closeable2, closeable3);
773   }
774 
testWhenAllComplete_callAsync()775   public void testWhenAllComplete_callAsync() throws Exception {
776     final ClosingFuture<String> input1 = ClosingFuture.from(immediateFuture("value1"));
777     final ClosingFuture<Object> input2Failed = failedClosingFuture();
778     final ClosingFuture<String> nonInput = ClosingFuture.from(immediateFuture("value3"));
779     final AtomicReference<ClosingFuture.Peeker> capturedPeeker = new AtomicReference<>();
780     ClosingFuture<TestCloseable> closingFuture =
781         ClosingFuture.whenAllComplete(ImmutableList.of(input1, input2Failed))
782             .callAsync(
783                 new AsyncCombiningCallable<TestCloseable>() {
784                   @Override
785                   public ClosingFuture<TestCloseable> call(DeferredCloser closer, Peeker peeker)
786                       throws Exception {
787                     closer.eventuallyClose(closeable1, closingExecutor);
788                     assertThat(peeker.getDone(input1)).isSameInstanceAs("value1");
789                     try {
790                       peeker.getDone(input2Failed);
791                       fail("Peeker should fail for failed inputs");
792                     } catch (ExecutionException expected) {
793                     }
794                     try {
795                       peeker.getDone(nonInput);
796                       fail("Peeker should not be able to peek into non-input ClosingFuture.");
797                     } catch (IllegalArgumentException expected) {
798                     }
799                     capturedPeeker.set(peeker);
800                     return ClosingFuture.eventuallyClosing(
801                         immediateFuture(closeable2), closingExecutor);
802                   }
803                 },
804                 executor);
805     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
806     waitUntilClosed(closingFuture);
807     assertClosed(closeable1, closeable2);
808     assertThrows(IllegalStateException.class, () -> capturedPeeker.get().getDone(input1));
809   }
810 
testWhenAllComplete_callAsync_cancelledPipeline()811   public void testWhenAllComplete_callAsync_cancelledPipeline() throws Exception {
812     ClosingFuture<TestCloseable> closingFuture =
813         ClosingFuture.whenAllComplete(
814                 ImmutableList.of(
815                     ClosingFuture.from(immediateFuture(closeable1)),
816                     ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor)))
817             .callAsync(
818                 waiter.waitFor(
819                     new AsyncCombiningCallable<TestCloseable>() {
820                       @Override
821                       public ClosingFuture<TestCloseable> call(DeferredCloser closer, Peeker peeker)
822                           throws Exception {
823                         awaitUninterruptibly(futureCancelled);
824                         closer.eventuallyClose(closeable1, closingExecutor);
825                         return ClosingFuture.eventuallyClosing(
826                             immediateFuture(closeable3), closingExecutor);
827                       }
828                     }),
829                 executor);
830     waiter.awaitStarted();
831     cancelFinalStepAndWait(closingFuture);
832     waiter.awaitReturned();
833     assertClosed(closeable1, closeable2, closeable3);
834   }
835 
testWhenAllComplete_callAsync_throws()836   public void testWhenAllComplete_callAsync_throws() throws Exception {
837     ClosingFuture<Object> closingFuture =
838         ClosingFuture.whenAllComplete(
839                 ImmutableList.of(
840                     ClosingFuture.from(immediateFuture(closeable1)),
841                     ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor)))
842             .callAsync(
843                 new AsyncCombiningCallable<Object>() {
844                   @Override
845                   public ClosingFuture<Object> call(DeferredCloser closer, Peeker peeker)
846                       throws Exception {
847                     closer.eventuallyClose(closeable3, closingExecutor);
848                     throw exception;
849                   }
850                 },
851                 executor);
852     assertFinallyFailsWithException(closingFuture);
853     waitUntilClosed(closingFuture);
854     assertStillOpen(closeable1);
855     assertClosed(closeable2, closeable3);
856   }
857 
858   // We don't need to test the happy case for SuccessfulCombiner.call(Async) because it's the same
859   // as Combiner.
860 
testWhenAllSucceed_call_failedInput()861   public void testWhenAllSucceed_call_failedInput() throws Exception {
862     assertFinallyFailsWithException(
863         ClosingFuture.whenAllSucceed(
864                 ImmutableList.of(
865                     ClosingFuture.from(immediateFuture("value")), failedClosingFuture()))
866             .call(
867                 new CombiningCallable<Object>() {
868                   @Override
869                   public Object call(DeferredCloser closer, Peeker peeker) throws Exception {
870                     expect.fail();
871                     throw new AssertionError();
872                   }
873                 },
874                 executor));
875   }
876 
testWhenAllSucceed_callAsync_failedInput()877   public void testWhenAllSucceed_callAsync_failedInput() throws Exception {
878     assertFinallyFailsWithException(
879         ClosingFuture.whenAllSucceed(
880                 ImmutableList.of(
881                     ClosingFuture.from(immediateFuture("value")), failedClosingFuture()))
882             .callAsync(
883                 new AsyncCombiningCallable<Object>() {
884                   @Override
885                   public ClosingFuture<Object> call(DeferredCloser closer, Peeker peeker)
886                       throws Exception {
887                     expect.fail();
888                     throw new AssertionError();
889                   }
890                 },
891                 executor));
892   }
893 
testWhenAllSucceed2_call()894   public void testWhenAllSucceed2_call() throws ExecutionException, IOException {
895     ClosingFuture<TestCloseable> closingFuture =
896         ClosingFuture.whenAllSucceed(
897                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
898                 ClosingFuture.from(immediateFuture("value1")))
899             .call(
900                 new ClosingFunction2<TestCloseable, String, TestCloseable>() {
901                   @Override
902                   public TestCloseable apply(DeferredCloser closer, TestCloseable v1, String v2)
903                       throws Exception {
904                     assertThat(v1).isEqualTo(closeable1);
905                     assertThat(v2).isEqualTo("value1");
906                     assertStillOpen(closeable1);
907                     closer.eventuallyClose(closeable2, closingExecutor);
908                     return closeable2;
909                   }
910                 },
911                 executor);
912     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
913     waitUntilClosed(closingFuture);
914     assertClosed(closeable1, closeable2);
915   }
916 
testWhenAllSucceed2_call_failedInput()917   public void testWhenAllSucceed2_call_failedInput() throws ExecutionException, IOException {
918     ClosingFuture<Object> closingFuture =
919         ClosingFuture.whenAllSucceed(
920                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
921                 failedClosingFuture())
922             .call(
923                 new ClosingFunction2<TestCloseable, Object, Object>() {
924                   @Override
925                   public Object apply(DeferredCloser closer, TestCloseable v1, Object v2)
926                       throws Exception {
927                     expect.fail();
928                     throw new AssertionError();
929                   }
930                 },
931                 executor);
932     assertFinallyFailsWithException(closingFuture);
933     waitUntilClosed(closingFuture);
934     assertClosed(closeable1);
935   }
936 
testWhenAllSucceed2_call_cancelledPipeline()937   public void testWhenAllSucceed2_call_cancelledPipeline() throws Exception {
938     ClosingFuture<TestCloseable> closingFuture =
939         ClosingFuture.whenAllSucceed(
940                 ClosingFuture.from(immediateFuture(closeable1)),
941                 ClosingFuture.from(immediateFuture(closeable2)))
942             .call(
943                 waiter.waitFor(
944                     new ClosingFunction2<TestCloseable, TestCloseable, TestCloseable>() {
945                       @Override
946                       public TestCloseable apply(
947                           DeferredCloser closer, TestCloseable v1, TestCloseable v2)
948                           throws Exception {
949                         awaitUninterruptibly(futureCancelled);
950                         closer.eventuallyClose(closeable1, closingExecutor);
951                         closer.eventuallyClose(closeable2, closingExecutor);
952                         return closeable3;
953                       }
954                     }),
955                 executor);
956     waiter.awaitStarted();
957     cancelFinalStepAndWait(closingFuture);
958     // not closed until the function returns
959     assertStillOpen(closeable1, closeable2);
960     waiter.awaitReturned();
961     assertClosed(closeable1, closeable2);
962     assertStillOpen(closeable3);
963   }
964 
testWhenAllSucceed2_call_throws()965   public void testWhenAllSucceed2_call_throws() throws Exception {
966     ClosingFuture<Object> closingFuture =
967         ClosingFuture.whenAllSucceed(
968                 ClosingFuture.from(immediateFuture(closeable1)),
969                 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))
970             .call(
971                 new ClosingFunction2<TestCloseable, TestCloseable, Object>() {
972                   @Override
973                   public Object apply(DeferredCloser closer, TestCloseable v1, TestCloseable v2)
974                       throws Exception {
975                     closer.eventuallyClose(closeable3, closingExecutor);
976                     throw exception;
977                   }
978                 },
979                 executor);
980     assertFinallyFailsWithException(closingFuture);
981     waitUntilClosed(closingFuture);
982     assertStillOpen(closeable1);
983     assertClosed(closeable2, closeable3);
984   }
985 
testWhenAllSucceed2_callAsync()986   public void testWhenAllSucceed2_callAsync() throws ExecutionException, IOException {
987     ClosingFuture<TestCloseable> closingFuture =
988         ClosingFuture.whenAllSucceed(
989                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
990                 ClosingFuture.from(immediateFuture("value1")))
991             .callAsync(
992                 new AsyncClosingFunction2<TestCloseable, String, TestCloseable>() {
993                   @Override
994                   public ClosingFuture<TestCloseable> apply(
995                       DeferredCloser closer, TestCloseable v1, String v2) throws Exception {
996                     assertThat(v1).isEqualTo(closeable1);
997                     assertThat(v2).isEqualTo("value1");
998                     assertStillOpen(closeable1);
999                     closer.eventuallyClose(closeable2, closingExecutor);
1000                     return ClosingFuture.eventuallyClosing(
1001                         immediateFuture(closeable3), closingExecutor);
1002                   }
1003                 },
1004                 executor);
1005     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable3);
1006     waitUntilClosed(closingFuture);
1007     assertClosed(closeable1, closeable2, closeable3);
1008   }
1009 
testWhenAllSucceed2_callAsync_failedInput()1010   public void testWhenAllSucceed2_callAsync_failedInput() throws ExecutionException, IOException {
1011     ClosingFuture<Object> closingFuture =
1012         ClosingFuture.whenAllSucceed(
1013                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
1014                 failedClosingFuture())
1015             .callAsync(
1016                 new AsyncClosingFunction2<TestCloseable, Object, Object>() {
1017                   @Override
1018                   public ClosingFuture<Object> apply(
1019                       DeferredCloser closer, TestCloseable v1, Object v2) throws Exception {
1020                     expect.fail();
1021                     throw new AssertionError();
1022                   }
1023                 },
1024                 executor);
1025     assertFinallyFailsWithException(closingFuture);
1026     waitUntilClosed(closingFuture);
1027     assertClosed(closeable1);
1028   }
1029 
testWhenAllSucceed2_callAsync_cancelledPipeline()1030   public void testWhenAllSucceed2_callAsync_cancelledPipeline() throws Exception {
1031     ClosingFuture<TestCloseable> closingFuture =
1032         ClosingFuture.whenAllSucceed(
1033                 ClosingFuture.from(immediateFuture(closeable1)),
1034                 ClosingFuture.from(immediateFuture(closeable2)))
1035             .callAsync(
1036                 waiter.waitFor(
1037                     new AsyncClosingFunction2<TestCloseable, TestCloseable, TestCloseable>() {
1038                       @Override
1039                       public ClosingFuture<TestCloseable> apply(
1040                           DeferredCloser closer, TestCloseable v1, TestCloseable v2)
1041                           throws Exception {
1042                         awaitUninterruptibly(futureCancelled);
1043                         closer.eventuallyClose(closeable1, closingExecutor);
1044                         closer.eventuallyClose(closeable2, closingExecutor);
1045                         return ClosingFuture.eventuallyClosing(
1046                             immediateFuture(closeable3), closingExecutor);
1047                       }
1048                     }),
1049                 executor);
1050     waiter.awaitStarted();
1051     cancelFinalStepAndWait(closingFuture);
1052     // not closed until the function returns
1053     assertStillOpen(closeable1, closeable2, closeable3);
1054     waiter.awaitReturned();
1055     assertClosed(closeable1, closeable2, closeable3);
1056   }
1057 
testWhenAllSucceed2_callAsync_throws()1058   public void testWhenAllSucceed2_callAsync_throws() throws Exception {
1059     ClosingFuture<Object> closingFuture =
1060         ClosingFuture.whenAllSucceed(
1061                 ClosingFuture.from(immediateFuture(closeable1)),
1062                 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))
1063             .callAsync(
1064                 new AsyncClosingFunction2<TestCloseable, TestCloseable, Object>() {
1065                   @Override
1066                   public ClosingFuture<Object> apply(
1067                       DeferredCloser closer, TestCloseable v1, TestCloseable v2) throws Exception {
1068                     closer.eventuallyClose(closeable3, closingExecutor);
1069                     throw exception;
1070                   }
1071                 },
1072                 executor);
1073     assertFinallyFailsWithException(closingFuture);
1074     waitUntilClosed(closingFuture);
1075     assertStillOpen(closeable1);
1076     assertClosed(closeable2, closeable3);
1077   }
1078 
testWhenAllSucceed3_call()1079   public void testWhenAllSucceed3_call() throws ExecutionException, IOException {
1080     ClosingFuture<TestCloseable> closingFuture =
1081         ClosingFuture.whenAllSucceed(
1082                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
1083                 ClosingFuture.from(immediateFuture("value2")),
1084                 ClosingFuture.from(immediateFuture("value3")))
1085             .call(
1086                 new ClosingFunction3<TestCloseable, String, String, TestCloseable>() {
1087                   @Override
1088                   public TestCloseable apply(
1089                       DeferredCloser closer, TestCloseable v1, String v2, String v3)
1090                       throws Exception {
1091                     assertThat(v1).isEqualTo(closeable1);
1092                     assertThat(v2).isEqualTo("value2");
1093                     assertThat(v3).isEqualTo("value3");
1094                     assertStillOpen(closeable1);
1095                     closer.eventuallyClose(closeable2, closingExecutor);
1096                     return closeable2;
1097                   }
1098                 },
1099                 executor);
1100     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
1101     waitUntilClosed(closingFuture);
1102     assertClosed(closeable1, closeable2);
1103   }
1104 
testWhenAllSucceed3_call_failedInput()1105   public void testWhenAllSucceed3_call_failedInput() throws ExecutionException, IOException {
1106     ClosingFuture<Object> closingFuture =
1107         ClosingFuture.whenAllSucceed(
1108                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
1109                 failedClosingFuture(),
1110                 ClosingFuture.from(immediateFuture("value3")))
1111             .call(
1112                 new ClosingFunction3<TestCloseable, Object, String, Object>() {
1113                   @Override
1114                   public Object apply(DeferredCloser closer, TestCloseable v1, Object v2, String v3)
1115                       throws Exception {
1116                     expect.fail();
1117                     throw new AssertionError();
1118                   }
1119                 },
1120                 executor);
1121     assertFinallyFailsWithException(closingFuture);
1122     waitUntilClosed(closingFuture);
1123     assertClosed(closeable1);
1124   }
1125 
testWhenAllSucceed3_call_cancelledPipeline()1126   public void testWhenAllSucceed3_call_cancelledPipeline() throws Exception {
1127     ClosingFuture<TestCloseable> closingFuture =
1128         ClosingFuture.whenAllSucceed(
1129                 ClosingFuture.from(immediateFuture(closeable1)),
1130                 ClosingFuture.from(immediateFuture(closeable2)),
1131                 ClosingFuture.from(immediateFuture("value3")))
1132             .call(
1133                 waiter.waitFor(
1134                     new ClosingFunction3<TestCloseable, TestCloseable, String, TestCloseable>() {
1135                       @Override
1136                       public TestCloseable apply(
1137                           DeferredCloser closer, TestCloseable v1, TestCloseable v2, String v3)
1138                           throws Exception {
1139                         awaitUninterruptibly(futureCancelled);
1140                         closer.eventuallyClose(closeable1, closingExecutor);
1141                         closer.eventuallyClose(closeable2, closingExecutor);
1142                         return closeable3;
1143                       }
1144                     }),
1145                 executor);
1146     waiter.awaitStarted();
1147     cancelFinalStepAndWait(closingFuture);
1148     // not closed until the function returns
1149     assertStillOpen(closeable1, closeable2);
1150     waiter.awaitReturned();
1151     assertClosed(closeable1, closeable2);
1152     assertStillOpen(closeable3);
1153   }
1154 
testWhenAllSucceed3_call_throws()1155   public void testWhenAllSucceed3_call_throws() throws Exception {
1156     ClosingFuture<Object> closingFuture =
1157         ClosingFuture.whenAllSucceed(
1158                 ClosingFuture.from(immediateFuture(closeable1)),
1159                 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor),
1160                 ClosingFuture.from(immediateFuture("value3")))
1161             .call(
1162                 new ClosingFunction3<TestCloseable, TestCloseable, String, Object>() {
1163                   @Override
1164                   public Object apply(
1165                       DeferredCloser closer, TestCloseable v1, TestCloseable v2, String v3)
1166                       throws Exception {
1167                     closer.eventuallyClose(closeable3, closingExecutor);
1168                     throw exception;
1169                   }
1170                 },
1171                 executor);
1172     assertFinallyFailsWithException(closingFuture);
1173     waitUntilClosed(closingFuture);
1174     assertStillOpen(closeable1);
1175     assertClosed(closeable2, closeable3);
1176   }
1177 
testWhenAllSucceed4_call()1178   public void testWhenAllSucceed4_call() throws ExecutionException, IOException {
1179     ClosingFuture<TestCloseable> closingFuture =
1180         ClosingFuture.whenAllSucceed(
1181                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
1182                 ClosingFuture.from(immediateFuture("value2")),
1183                 ClosingFuture.from(immediateFuture("value3")),
1184                 ClosingFuture.from(immediateFuture("value4")))
1185             .call(
1186                 new ClosingFunction4<TestCloseable, String, String, String, TestCloseable>() {
1187                   @Override
1188                   public TestCloseable apply(
1189                       DeferredCloser closer, TestCloseable v1, String v2, String v3, String v4)
1190                       throws Exception {
1191                     assertThat(v1).isEqualTo(closeable1);
1192                     assertThat(v2).isEqualTo("value2");
1193                     assertThat(v3).isEqualTo("value3");
1194                     assertThat(v4).isEqualTo("value4");
1195                     assertStillOpen(closeable1);
1196                     closer.eventuallyClose(closeable2, closingExecutor);
1197                     return closeable2;
1198                   }
1199                 },
1200                 executor);
1201     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
1202     waitUntilClosed(closingFuture);
1203     assertClosed(closeable1, closeable2);
1204   }
1205 
testWhenAllSucceed4_call_failedInput()1206   public void testWhenAllSucceed4_call_failedInput() throws ExecutionException, IOException {
1207     ClosingFuture<Object> closingFuture =
1208         ClosingFuture.whenAllSucceed(
1209                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
1210                 failedClosingFuture(),
1211                 ClosingFuture.from(immediateFuture("value3")),
1212                 ClosingFuture.from(immediateFuture("value4")))
1213             .call(
1214                 new ClosingFunction4<TestCloseable, Object, String, String, Object>() {
1215                   @Override
1216                   public Object apply(
1217                       DeferredCloser closer, TestCloseable v1, Object v2, String v3, String v4)
1218                       throws Exception {
1219                     expect.fail();
1220                     throw new AssertionError();
1221                   }
1222                 },
1223                 executor);
1224     assertFinallyFailsWithException(closingFuture);
1225     waitUntilClosed(closingFuture);
1226     assertClosed(closeable1);
1227   }
1228 
testWhenAllSucceed4_call_cancelledPipeline()1229   public void testWhenAllSucceed4_call_cancelledPipeline() throws Exception {
1230     ClosingFuture<TestCloseable> closingFuture =
1231         ClosingFuture.whenAllSucceed(
1232                 ClosingFuture.from(immediateFuture(closeable1)),
1233                 ClosingFuture.from(immediateFuture(closeable2)),
1234                 ClosingFuture.from(immediateFuture("value3")),
1235                 ClosingFuture.from(immediateFuture("value4")))
1236             .call(
1237                 waiter.waitFor(
1238                     new ClosingFunction4<
1239                         TestCloseable, TestCloseable, String, String, TestCloseable>() {
1240                       @Override
1241                       public TestCloseable apply(
1242                           DeferredCloser closer,
1243                           TestCloseable v1,
1244                           TestCloseable v2,
1245                           String v3,
1246                           String v4)
1247                           throws Exception {
1248                         awaitUninterruptibly(futureCancelled);
1249                         closer.eventuallyClose(closeable1, closingExecutor);
1250                         closer.eventuallyClose(closeable2, closingExecutor);
1251                         return closeable3;
1252                       }
1253                     }),
1254                 executor);
1255     waiter.awaitStarted();
1256     cancelFinalStepAndWait(closingFuture);
1257     // not closed until the function returns
1258     assertStillOpen(closeable1, closeable2);
1259     waiter.awaitReturned();
1260     assertClosed(closeable1, closeable2);
1261     assertStillOpen(closeable3);
1262   }
1263 
testWhenAllSucceed4_call_throws()1264   public void testWhenAllSucceed4_call_throws() throws Exception {
1265     ClosingFuture<Object> closingFuture =
1266         ClosingFuture.whenAllSucceed(
1267                 ClosingFuture.from(immediateFuture(closeable1)),
1268                 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor),
1269                 ClosingFuture.from(immediateFuture("value3")),
1270                 ClosingFuture.from(immediateFuture("value4")))
1271             .call(
1272                 new ClosingFunction4<TestCloseable, TestCloseable, String, String, Object>() {
1273                   @Override
1274                   public Object apply(
1275                       DeferredCloser closer,
1276                       TestCloseable v1,
1277                       TestCloseable v2,
1278                       String v3,
1279                       String v4)
1280                       throws Exception {
1281                     closer.eventuallyClose(closeable3, closingExecutor);
1282                     throw exception;
1283                   }
1284                 },
1285                 executor);
1286     assertFinallyFailsWithException(closingFuture);
1287     waitUntilClosed(closingFuture);
1288     assertStillOpen(closeable1);
1289     assertClosed(closeable2, closeable3);
1290   }
1291 
testWhenAllSucceed5_call()1292   public void testWhenAllSucceed5_call() throws ExecutionException, IOException {
1293     ClosingFuture<TestCloseable> closingFuture =
1294         ClosingFuture.whenAllSucceed(
1295                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
1296                 ClosingFuture.from(immediateFuture("value2")),
1297                 ClosingFuture.from(immediateFuture("value3")),
1298                 ClosingFuture.from(immediateFuture("value4")),
1299                 ClosingFuture.from(immediateFuture("value5")))
1300             .call(
1301                 new ClosingFunction5<
1302                     TestCloseable, String, String, String, String, TestCloseable>() {
1303                   @Override
1304                   public TestCloseable apply(
1305                       DeferredCloser closer,
1306                       TestCloseable v1,
1307                       String v2,
1308                       String v3,
1309                       String v4,
1310                       String v5)
1311                       throws Exception {
1312                     assertThat(v1).isEqualTo(closeable1);
1313                     assertThat(v2).isEqualTo("value2");
1314                     assertThat(v3).isEqualTo("value3");
1315                     assertThat(v4).isEqualTo("value4");
1316                     assertThat(v5).isEqualTo("value5");
1317                     assertStillOpen(closeable1);
1318                     closer.eventuallyClose(closeable2, closingExecutor);
1319                     return closeable2;
1320                   }
1321                 },
1322                 executor);
1323     assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
1324     waitUntilClosed(closingFuture);
1325     assertClosed(closeable1, closeable2);
1326   }
1327 
testWhenAllSucceed5_call_failedInput()1328   public void testWhenAllSucceed5_call_failedInput() throws ExecutionException, IOException {
1329     ClosingFuture<Object> closingFuture =
1330         ClosingFuture.whenAllSucceed(
1331                 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor),
1332                 failedClosingFuture(),
1333                 ClosingFuture.from(immediateFuture("value3")),
1334                 ClosingFuture.from(immediateFuture("value4")),
1335                 ClosingFuture.from(immediateFuture("value5")))
1336             .call(
1337                 new ClosingFunction5<TestCloseable, Object, String, String, String, Object>() {
1338                   @Override
1339                   public Object apply(
1340                       DeferredCloser closer,
1341                       TestCloseable v1,
1342                       Object v2,
1343                       String v3,
1344                       String v4,
1345                       String v5)
1346                       throws Exception {
1347                     expect.fail();
1348                     throw new AssertionError();
1349                   }
1350                 },
1351                 executor);
1352     assertFinallyFailsWithException(closingFuture);
1353     waitUntilClosed(closingFuture);
1354     assertClosed(closeable1);
1355   }
1356 
testWhenAllSucceed5_call_cancelledPipeline()1357   public void testWhenAllSucceed5_call_cancelledPipeline() throws Exception {
1358     ClosingFuture<TestCloseable> closingFuture =
1359         ClosingFuture.whenAllSucceed(
1360                 ClosingFuture.from(immediateFuture(closeable1)),
1361                 ClosingFuture.from(immediateFuture(closeable2)),
1362                 ClosingFuture.from(immediateFuture("value3")),
1363                 ClosingFuture.from(immediateFuture("value4")),
1364                 ClosingFuture.from(immediateFuture("value5")))
1365             .call(
1366                 waiter.waitFor(
1367                     new ClosingFunction5<
1368                         TestCloseable, TestCloseable, String, String, String, TestCloseable>() {
1369                       @Override
1370                       public TestCloseable apply(
1371                           DeferredCloser closer,
1372                           TestCloseable v1,
1373                           TestCloseable v2,
1374                           String v3,
1375                           String v4,
1376                           String v5)
1377                           throws Exception {
1378                         awaitUninterruptibly(futureCancelled);
1379                         closer.eventuallyClose(closeable1, closingExecutor);
1380                         closer.eventuallyClose(closeable2, closingExecutor);
1381                         return closeable3;
1382                       }
1383                     }),
1384                 executor);
1385     waiter.awaitStarted();
1386     cancelFinalStepAndWait(closingFuture);
1387     // not closed until the function returns
1388     assertStillOpen(closeable1, closeable2);
1389     waiter.awaitReturned();
1390     assertClosed(closeable1, closeable2);
1391     assertStillOpen(closeable3);
1392   }
1393 
testWhenAllSucceed5_call_throws()1394   public void testWhenAllSucceed5_call_throws() throws Exception {
1395     ClosingFuture<Object> closingFuture =
1396         ClosingFuture.whenAllSucceed(
1397                 ClosingFuture.from(immediateFuture(closeable1)),
1398                 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor),
1399                 ClosingFuture.from(immediateFuture("value3")),
1400                 ClosingFuture.from(immediateFuture("value4")),
1401                 ClosingFuture.from(immediateFuture("value5")))
1402             .call(
1403                 new ClosingFunction5<
1404                     TestCloseable, TestCloseable, String, String, String, Object>() {
1405                   @Override
1406                   public Object apply(
1407                       DeferredCloser closer,
1408                       TestCloseable v1,
1409                       TestCloseable v2,
1410                       String v3,
1411                       String v4,
1412                       String v5)
1413                       throws Exception {
1414                     closer.eventuallyClose(closeable3, closingExecutor);
1415                     throw exception;
1416                   }
1417                 },
1418                 executor);
1419     assertFinallyFailsWithException(closingFuture);
1420     waitUntilClosed(closingFuture);
1421     assertStillOpen(closeable1);
1422     assertClosed(closeable2, closeable3);
1423   }
1424 
testTransform_preventsFurtherOperations()1425   public void testTransform_preventsFurtherOperations() {
1426     ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1"));
1427     ClosingFuture<String> unused =
1428         closingFuture.transform(
1429             new ClosingFunction<String, String>() {
1430               @Override
1431               public String apply(DeferredCloser closer, String v) throws Exception {
1432                 return "value2";
1433               }
1434             },
1435             executor);
1436     assertDerivingThrowsIllegalStateException(closingFuture);
1437     assertFinalStepThrowsIllegalStateException(closingFuture);
1438   }
1439 
testTransformAsync_preventsFurtherOperations()1440   public void testTransformAsync_preventsFurtherOperations() {
1441     ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1"));
1442     ClosingFuture<String> unused =
1443         closingFuture.transformAsync(
1444             new AsyncClosingFunction<String, String>() {
1445               @Override
1446               public ClosingFuture<String> apply(DeferredCloser closer, String v) throws Exception {
1447                 return ClosingFuture.from(immediateFuture("value2"));
1448               }
1449             },
1450             executor);
1451     assertDerivingThrowsIllegalStateException(closingFuture);
1452     assertFinalStepThrowsIllegalStateException(closingFuture);
1453   }
1454 
testCatching_preventsFurtherOperations()1455   public void testCatching_preventsFurtherOperations() {
1456     ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1"));
1457     ClosingFuture<String> unused =
1458         closingFuture.catching(
1459             Exception.class,
1460             new ClosingFunction<Exception, String>() {
1461               @Override
1462               public String apply(DeferredCloser closer, Exception x) throws Exception {
1463                 return "value2";
1464               }
1465             },
1466             executor);
1467     assertDerivingThrowsIllegalStateException(closingFuture);
1468     assertFinalStepThrowsIllegalStateException(closingFuture);
1469   }
1470 
testCatchingAsync_preventsFurtherOperations()1471   public void testCatchingAsync_preventsFurtherOperations() {
1472     ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1"));
1473     ClosingFuture<String> unused =
1474         closingFuture.catchingAsync(
1475             Exception.class,
1476             ClosingFuture.withoutCloser(
1477                 new AsyncFunction<Exception, String>() {
1478                   @Override
1479                   public ListenableFuture<String> apply(Exception x) throws Exception {
1480                     return immediateFuture("value2");
1481                   }
1482                 }),
1483             executor);
1484     assertDerivingThrowsIllegalStateException(closingFuture);
1485     assertFinalStepThrowsIllegalStateException(closingFuture);
1486   }
1487 
testWhenAllComplete_preventsFurtherOperations()1488   public void testWhenAllComplete_preventsFurtherOperations() {
1489     ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1"));
1490     Combiner unused = ClosingFuture.whenAllComplete(asList(closingFuture));
1491     assertDerivingThrowsIllegalStateException(closingFuture);
1492     assertFinalStepThrowsIllegalStateException(closingFuture);
1493   }
1494 
testWhenAllSucceed_preventsFurtherOperations()1495   public void testWhenAllSucceed_preventsFurtherOperations() {
1496     ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1"));
1497     Combiner unused = ClosingFuture.whenAllSucceed(asList(closingFuture));
1498     assertDerivingThrowsIllegalStateException(closingFuture);
1499     assertFinalStepThrowsIllegalStateException(closingFuture);
1500   }
1501 
assertDerivingThrowsIllegalStateException( ClosingFuture<String> closingFuture)1502   protected final void assertDerivingThrowsIllegalStateException(
1503       ClosingFuture<String> closingFuture) {
1504     try {
1505       closingFuture.transform(
1506           new ClosingFunction<String, String>() {
1507             @Override
1508             public String apply(DeferredCloser closer3, String v1) throws Exception {
1509               return "value3";
1510             }
1511           },
1512           executor);
1513       fail();
1514     } catch (IllegalStateException expected5) {
1515     }
1516     try {
1517       closingFuture.transformAsync(
1518           new AsyncClosingFunction<String, String>() {
1519             @Override
1520             public ClosingFuture<String> apply(DeferredCloser closer2, String v) throws Exception {
1521               return ClosingFuture.from(immediateFuture("value3"));
1522             }
1523           },
1524           executor);
1525       fail();
1526     } catch (IllegalStateException expected4) {
1527     }
1528     try {
1529       closingFuture.catching(
1530           Exception.class,
1531           new ClosingFunction<Exception, String>() {
1532             @Override
1533             public String apply(DeferredCloser closer1, Exception x1) throws Exception {
1534               return "value3";
1535             }
1536           },
1537           executor);
1538       fail();
1539     } catch (IllegalStateException expected3) {
1540     }
1541     try {
1542       closingFuture.catchingAsync(
1543           Exception.class,
1544           new AsyncClosingFunction<Exception, String>() {
1545             @Override
1546             public ClosingFuture<String> apply(DeferredCloser closer, Exception x)
1547                 throws Exception {
1548               return ClosingFuture.from(immediateFuture("value3"));
1549             }
1550           },
1551           executor);
1552       fail();
1553     } catch (IllegalStateException expected2) {
1554     }
1555     try {
1556       ClosingFuture.whenAllComplete(asList(closingFuture));
1557       fail();
1558     } catch (IllegalStateException expected1) {
1559     }
1560     try {
1561       ClosingFuture.whenAllSucceed(asList(closingFuture));
1562       fail();
1563     } catch (IllegalStateException expected) {
1564     }
1565   }
1566 
1567   /** Asserts that marking this step a final step throws {@link IllegalStateException}. */
assertFinalStepThrowsIllegalStateException(ClosingFuture<?> closingFuture)1568   protected void assertFinalStepThrowsIllegalStateException(ClosingFuture<?> closingFuture) {
1569     try {
1570       closingFuture.finishToFuture();
1571       fail();
1572     } catch (IllegalStateException expected) {
1573     }
1574     try {
1575       closingFuture.finishToValueAndCloser(new NoOpValueAndCloserConsumer<>(), executor);
1576       fail();
1577     } catch (IllegalStateException expected) {
1578     }
1579   }
1580 
1581   // Avoid infinite recursion if a closeable's close() method throws RejectedExecutionException and
1582   // is closed using the direct executor.
testCloseThrowsRejectedExecutionException()1583   public void testCloseThrowsRejectedExecutionException() throws Exception {
1584     doThrow(new RejectedExecutionException()).when(mockCloseable).close();
1585     ClosingFuture<Closeable> closingFuture =
1586         ClosingFuture.submit(
1587             new ClosingCallable<Closeable>() {
1588               @Override
1589               public Closeable call(DeferredCloser closer) throws Exception {
1590                 return closer.eventuallyClose(mockCloseable, directExecutor());
1591               }
1592             },
1593             executor);
1594     assertThat(getFinalValue(closingFuture)).isEqualTo(mockCloseable);
1595     waitUntilClosed(closingFuture);
1596     verify(mockCloseable, timeout(1000)).close();
1597   }
1598 
1599   /**
1600    * Marks the given step final, waits for it to be finished, and returns the value.
1601    *
1602    * @throws ExecutionException if the step failed
1603    * @throws CancellationException if the step was cancelled
1604    */
getFinalValue(ClosingFuture<T> closingFuture)1605   abstract <T> T getFinalValue(ClosingFuture<T> closingFuture) throws ExecutionException;
1606 
1607   /** Marks the given step final, cancels it, and waits for the cancellation to happen. */
cancelFinalStepAndWait(ClosingFuture<TestCloseable> closingFuture)1608   abstract void cancelFinalStepAndWait(ClosingFuture<TestCloseable> closingFuture);
1609 
1610   /**
1611    * Marks the given step final and waits for it to fail. Expects the failure exception to match
1612    * {@link AbstractClosingFutureTest#exception}.
1613    */
assertFinallyFailsWithException(ClosingFuture<?> closingFuture)1614   abstract void assertFinallyFailsWithException(ClosingFuture<?> closingFuture);
1615 
1616   /** Waits for the given step to be canceled. */
assertBecomesCanceled(ClosingFuture<?> closingFuture)1617   abstract void assertBecomesCanceled(ClosingFuture<?> closingFuture) throws ExecutionException;
1618 
1619   /** Waits for the given step's closeables to be closed. */
waitUntilClosed(ClosingFuture<?> closingFuture)1620   void waitUntilClosed(ClosingFuture<?> closingFuture) {
1621     assertTrue(awaitUninterruptibly(closingFuture.whenClosedCountDown(), 1, SECONDS));
1622   }
1623 
assertThatFutureFailsWithException(Future<?> future)1624   void assertThatFutureFailsWithException(Future<?> future) {
1625     try {
1626       getUninterruptibly(future);
1627       fail("Expected future to fail: " + future);
1628     } catch (ExecutionException e) {
1629       assertThat(e).hasCauseThat().isSameInstanceAs(exception);
1630     }
1631   }
1632 
assertThatFutureBecomesCancelled(Future<?> future)1633   static void assertThatFutureBecomesCancelled(Future<?> future) throws ExecutionException {
1634     try {
1635       getUninterruptibly(future);
1636       fail("Expected future to be canceled: " + future);
1637     } catch (CancellationException expected) {
1638     }
1639   }
1640 
assertStillOpen(TestCloseable closeable1, TestCloseable... moreCloseables)1641   private static void assertStillOpen(TestCloseable closeable1, TestCloseable... moreCloseables)
1642       throws IOException {
1643     for (TestCloseable closeable : asList(closeable1, moreCloseables)) {
1644       assertWithMessage("%s.stillOpen()", closeable).that(closeable.stillOpen()).isTrue();
1645     }
1646   }
1647 
assertClosed(TestCloseable closeable1, TestCloseable... moreCloseables)1648   static void assertClosed(TestCloseable closeable1, TestCloseable... moreCloseables)
1649       throws IOException {
1650     for (TestCloseable closeable : asList(closeable1, moreCloseables)) {
1651       assertWithMessage("%s.isClosed()", closeable).that(closeable.awaitClosed()).isTrue();
1652     }
1653   }
1654 
failedClosingFuture()1655   private ClosingFuture<Object> failedClosingFuture() {
1656     return ClosingFuture.from(immediateFailedFuture(exception));
1657   }
1658 
assertNoExpectedFailures()1659   private void assertNoExpectedFailures() {
1660     assertWithMessage("executor was shut down")
1661         .that(shutdownAndAwaitTermination(executor, 10, SECONDS))
1662         .isTrue();
1663     assertWithMessage("closingExecutor was shut down")
1664         .that(shutdownAndAwaitTermination(closingExecutor, 10, SECONDS))
1665         .isTrue();
1666     if (!failures.isEmpty()) {
1667       StringWriter message = new StringWriter();
1668       PrintWriter writer = new PrintWriter(message);
1669       writer.println("Expected no failures, but found:");
1670       for (AssertionError failure : failures) {
1671         failure.printStackTrace(writer);
1672       }
1673       failures.clear();
1674       assertWithMessage(message.toString()).fail();
1675     }
1676   }
1677 
1678   static final class TestCloseable implements Closeable {
1679     private final CountDownLatch latch = new CountDownLatch(1);
1680     private final String name;
1681 
TestCloseable(String name)1682     TestCloseable(String name) {
1683       this.name = name;
1684     }
1685 
1686     @Override
close()1687     public void close() throws IOException {
1688       latch.countDown();
1689     }
1690 
awaitClosed()1691     boolean awaitClosed() {
1692       return awaitUninterruptibly(latch, 10, SECONDS);
1693     }
1694 
stillOpen()1695     boolean stillOpen() {
1696       return !awaitUninterruptibly(latch, 1, SECONDS);
1697     }
1698 
1699     @Override
toString()1700     public String toString() {
1701       return name;
1702     }
1703   }
1704 
1705   static final class Waiter {
1706     private final CountDownLatch started = new CountDownLatch(1);
1707     private final CountDownLatch canReturn = new CountDownLatch(1);
1708     private final CountDownLatch returned = new CountDownLatch(1);
1709     private Object proxy;
1710 
waitFor(Callable<V> callable)1711     <V> Callable<V> waitFor(Callable<V> callable) {
1712       return waitFor(callable, Callable.class);
1713     }
1714 
waitFor(ClosingCallable<V> closingCallable)1715     <V> ClosingCallable<V> waitFor(ClosingCallable<V> closingCallable) {
1716       return waitFor(closingCallable, ClosingCallable.class);
1717     }
1718 
waitFor(AsyncClosingCallable<V> asyncClosingCallable)1719     <V> AsyncClosingCallable<V> waitFor(AsyncClosingCallable<V> asyncClosingCallable) {
1720       return waitFor(asyncClosingCallable, AsyncClosingCallable.class);
1721     }
1722 
waitFor(ClosingFunction<T, U> closingFunction)1723     <T, U> ClosingFunction<T, U> waitFor(ClosingFunction<T, U> closingFunction) {
1724       return waitFor(closingFunction, ClosingFunction.class);
1725     }
1726 
waitFor(AsyncClosingFunction<T, U> asyncClosingFunction)1727     <T, U> AsyncClosingFunction<T, U> waitFor(AsyncClosingFunction<T, U> asyncClosingFunction) {
1728       return waitFor(asyncClosingFunction, AsyncClosingFunction.class);
1729     }
1730 
waitFor(CombiningCallable<V> combiningCallable)1731     <V> CombiningCallable<V> waitFor(CombiningCallable<V> combiningCallable) {
1732       return waitFor(combiningCallable, CombiningCallable.class);
1733     }
1734 
waitFor(AsyncCombiningCallable<V> asyncCombiningCallable)1735     <V> AsyncCombiningCallable<V> waitFor(AsyncCombiningCallable<V> asyncCombiningCallable) {
1736       return waitFor(asyncCombiningCallable, AsyncCombiningCallable.class);
1737     }
1738 
waitFor(ClosingFunction2<V1, V2, U> closingFunction2)1739     <V1, V2, U> ClosingFunction2<V1, V2, U> waitFor(ClosingFunction2<V1, V2, U> closingFunction2) {
1740       return waitFor(closingFunction2, ClosingFunction2.class);
1741     }
1742 
waitFor( AsyncClosingFunction2<V1, V2, U> asyncClosingFunction2)1743     <V1, V2, U> AsyncClosingFunction2<V1, V2, U> waitFor(
1744         AsyncClosingFunction2<V1, V2, U> asyncClosingFunction2) {
1745       return waitFor(asyncClosingFunction2, AsyncClosingFunction2.class);
1746     }
1747 
waitFor( ClosingFunction3<V1, V2, V3, U> closingFunction3)1748     <V1, V2, V3, U> ClosingFunction3<V1, V2, V3, U> waitFor(
1749         ClosingFunction3<V1, V2, V3, U> closingFunction3) {
1750       return waitFor(closingFunction3, ClosingFunction3.class);
1751     }
1752 
waitFor( ClosingFunction4<V1, V2, V3, V4, U> closingFunction4)1753     <V1, V2, V3, V4, U> ClosingFunction4<V1, V2, V3, V4, U> waitFor(
1754         ClosingFunction4<V1, V2, V3, V4, U> closingFunction4) {
1755       return waitFor(closingFunction4, ClosingFunction4.class);
1756     }
1757 
waitFor( ClosingFunction5<V1, V2, V3, V4, V5, U> closingFunction5)1758     <V1, V2, V3, V4, V5, U> ClosingFunction5<V1, V2, V3, V4, V5, U> waitFor(
1759         ClosingFunction5<V1, V2, V3, V4, V5, U> closingFunction5) {
1760       return waitFor(closingFunction5, ClosingFunction5.class);
1761     }
1762 
waitFor(final T delegate, final Class<T> type)1763     <T> T waitFor(final T delegate, final Class<T> type) {
1764       checkState(proxy == null);
1765       T proxyObject =
1766           Reflection.newProxy(
1767               type,
1768               new InvocationHandler() {
1769                 @Override
1770                 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
1771                   if (!method.getDeclaringClass().equals(type)) {
1772                     return method.invoke(delegate, args);
1773                   }
1774                   checkState(started.getCount() == 1);
1775                   started.countDown();
1776                   try {
1777                     return method.invoke(delegate, args);
1778                   } catch (InvocationTargetException e) {
1779                     throw e.getCause();
1780                   } finally {
1781                     awaitUninterruptibly(canReturn);
1782                     returned.countDown();
1783                   }
1784                 }
1785               });
1786       this.proxy = proxyObject;
1787       return proxyObject;
1788     }
1789 
awaitStarted()1790     void awaitStarted() {
1791       assertTrue(awaitUninterruptibly(started, 10, SECONDS));
1792     }
1793 
awaitReturned()1794     void awaitReturned() {
1795       canReturn.countDown();
1796       assertTrue(awaitUninterruptibly(returned, 10, SECONDS));
1797     }
1798   }
1799 
1800   static final class NoOpValueAndCloserConsumer<V> implements ValueAndCloserConsumer<V> {
1801     @Override
accept(ValueAndCloser<V> valueAndCloser)1802     public void accept(ValueAndCloser<V> valueAndCloser) {}
1803   }
1804 }
1805