1 /* 2 * Copyright (C) 2017 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.base.Preconditions.checkState; 20 import static com.google.common.collect.Lists.asList; 21 import static com.google.common.truth.Truth.assertThat; 22 import static com.google.common.truth.Truth.assertWithMessage; 23 import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; 24 import static com.google.common.util.concurrent.Futures.immediateFailedFuture; 25 import static com.google.common.util.concurrent.Futures.immediateFuture; 26 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 27 import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination; 28 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; 29 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 30 import static java.util.Arrays.asList; 31 import static java.util.concurrent.Executors.newSingleThreadExecutor; 32 import static java.util.concurrent.TimeUnit.SECONDS; 33 import static org.junit.Assert.assertThrows; 34 import static org.mockito.Mockito.doThrow; 35 import static org.mockito.Mockito.timeout; 36 import static org.mockito.Mockito.verify; 37 38 import com.google.common.collect.ImmutableList; 39 import com.google.common.reflect.Reflection; 40 import com.google.common.truth.FailureStrategy; 41 import com.google.common.truth.StandardSubjectBuilder; 42 import com.google.common.util.concurrent.ClosingFuture.AsyncClosingCallable; 43 import com.google.common.util.concurrent.ClosingFuture.AsyncClosingFunction; 44 import com.google.common.util.concurrent.ClosingFuture.ClosingCallable; 45 import com.google.common.util.concurrent.ClosingFuture.ClosingFunction; 46 import com.google.common.util.concurrent.ClosingFuture.Combiner; 47 import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable; 48 import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable; 49 import com.google.common.util.concurrent.ClosingFuture.Combiner2.AsyncClosingFunction2; 50 import com.google.common.util.concurrent.ClosingFuture.Combiner2.ClosingFunction2; 51 import com.google.common.util.concurrent.ClosingFuture.Combiner3.ClosingFunction3; 52 import com.google.common.util.concurrent.ClosingFuture.Combiner4.ClosingFunction4; 53 import com.google.common.util.concurrent.ClosingFuture.Combiner5.ClosingFunction5; 54 import com.google.common.util.concurrent.ClosingFuture.DeferredCloser; 55 import com.google.common.util.concurrent.ClosingFuture.Peeker; 56 import com.google.common.util.concurrent.ClosingFuture.ValueAndCloser; 57 import com.google.common.util.concurrent.ClosingFuture.ValueAndCloserConsumer; 58 import java.io.Closeable; 59 import java.io.IOException; 60 import java.io.PrintWriter; 61 import java.io.StringWriter; 62 import java.lang.reflect.InvocationHandler; 63 import java.lang.reflect.InvocationTargetException; 64 import java.lang.reflect.Method; 65 import java.util.ArrayList; 66 import java.util.List; 67 import java.util.concurrent.Callable; 68 import java.util.concurrent.CancellationException; 69 import java.util.concurrent.CountDownLatch; 70 import java.util.concurrent.ExecutionException; 71 import java.util.concurrent.Executor; 72 import java.util.concurrent.ExecutorService; 73 import java.util.concurrent.Future; 74 import java.util.concurrent.RejectedExecutionException; 75 import java.util.concurrent.atomic.AtomicReference; 76 import junit.framework.TestCase; 77 import org.mockito.Mockito; 78 79 /** 80 * Tests for {@link ClosingFuture}. Subclasses exercise either the {@link 81 * ClosingFuture#finishToFuture()} or {@link 82 * ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} paths to complete a 83 * {@link ClosingFuture} pipeline. 84 */ 85 public abstract class AbstractClosingFutureTest extends TestCase { 86 // TODO(dpb): Use Expect once that supports JUnit 3, or we can use JUnit 4. 87 final List<AssertionError> failures = new ArrayList<>(); 88 final StandardSubjectBuilder expect = 89 StandardSubjectBuilder.forCustomFailureStrategy( 90 new FailureStrategy() { 91 @Override 92 public void fail(AssertionError failure) { 93 failures.add(failure); 94 } 95 }); 96 97 final ListeningExecutorService executor = 98 MoreExecutors.listeningDecorator(newSingleThreadExecutor()); 99 final ExecutorService closingExecutor = newSingleThreadExecutor(); 100 101 final TestCloseable closeable1 = new TestCloseable("closeable1"); 102 final TestCloseable closeable2 = new TestCloseable("closeable2"); 103 final TestCloseable closeable3 = new TestCloseable("closeable3"); 104 final TestCloseable closeable4 = new TestCloseable("closeable4"); 105 106 final Waiter waiter = new Waiter(); 107 final CountDownLatch futureCancelled = new CountDownLatch(1); 108 final Exception exception = new Exception(); 109 final Closeable mockCloseable = Mockito.mock(Closeable.class); 110 111 @Override tearDown()112 protected void tearDown() throws Exception { 113 assertNoExpectedFailures(); 114 super.tearDown(); 115 } 116 testFrom()117 public void testFrom() throws Exception { 118 ClosingFuture<String> closingFuture = 119 ClosingFuture.from(executor.submit(Callables.returning(closeable1))) 120 .transform( 121 new ClosingFunction<TestCloseable, String>() { 122 @Override 123 public String apply(DeferredCloser closer, TestCloseable v) throws Exception { 124 assertThat(v).isSameInstanceAs(closeable1); 125 return "value"; 126 } 127 }, 128 executor); 129 assertThat(getFinalValue(closingFuture)).isEqualTo("value"); 130 waitUntilClosed(closingFuture); 131 assertStillOpen(closeable1); 132 } 133 testFrom_failedInput()134 public void testFrom_failedInput() throws Exception { 135 assertFinallyFailsWithException(failedClosingFuture()); 136 } 137 testFrom_cancelledInput()138 public void testFrom_cancelledInput() throws Exception { 139 assertBecomesCanceled(ClosingFuture.from(immediateCancelledFuture())); 140 } 141 testEventuallyClosing()142 public void testEventuallyClosing() throws Exception { 143 ClosingFuture<String> closingFuture = 144 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor) 145 .transform( 146 new ClosingFunction<TestCloseable, String>() { 147 @Override 148 public String apply(DeferredCloser closer, TestCloseable v) throws Exception { 149 assertThat(v).isSameInstanceAs(closeable1); 150 assertStillOpen(closeable1); 151 return "value"; 152 } 153 }, 154 executor); 155 assertThat(getFinalValue(closingFuture)).isEqualTo("value"); 156 waitUntilClosed(closingFuture); 157 assertClosed(closeable1); 158 } 159 testEventuallyClosing_failedInput()160 public void testEventuallyClosing_failedInput() throws Exception { 161 assertFinallyFailsWithException( 162 ClosingFuture.eventuallyClosing( 163 Futures.<Closeable>immediateFailedFuture(exception), closingExecutor)); 164 } 165 testEventuallyClosing_cancelledInput()166 public void testEventuallyClosing_cancelledInput() throws Exception { 167 assertBecomesCanceled( 168 ClosingFuture.eventuallyClosing( 169 Futures.<Closeable>immediateCancelledFuture(), closingExecutor)); 170 } 171 testEventuallyClosing_cancelledPipeline()172 public void testEventuallyClosing_cancelledPipeline() throws Exception { 173 ClosingFuture<TestCloseable> closingFuture = 174 ClosingFuture.eventuallyClosing( 175 executor.submit( 176 waiter.waitFor( 177 new Callable<TestCloseable>() { 178 @Override 179 public TestCloseable call() throws InterruptedException { 180 awaitUninterruptibly(futureCancelled); 181 return closeable1; 182 } 183 })), 184 closingExecutor); 185 waiter.awaitStarted(); 186 cancelFinalStepAndWait(closingFuture); 187 // not closed until the callable returns 188 assertStillOpen(closeable1); 189 waiter.awaitReturned(); 190 assertClosed(closeable1); 191 } 192 testEventuallyClosing_throws()193 public void testEventuallyClosing_throws() throws Exception { 194 assertFinallyFailsWithException( 195 ClosingFuture.eventuallyClosing( 196 executor.submit( 197 new Callable<TestCloseable>() { 198 @Override 199 public TestCloseable call() throws Exception { 200 throw exception; 201 } 202 }), 203 closingExecutor)); 204 } 205 testSubmit()206 public void testSubmit() throws Exception { 207 ClosingFuture<String> closingFuture = 208 ClosingFuture.submit( 209 new ClosingCallable<TestCloseable>() { 210 @Override 211 public TestCloseable call(DeferredCloser closer) throws Exception { 212 closer.eventuallyClose(closeable1, closingExecutor); 213 closer.eventuallyClose(closeable2, closingExecutor); 214 return closeable3; 215 } 216 }, 217 executor) 218 .transform( 219 new ClosingFunction<TestCloseable, String>() { 220 @Override 221 public String apply(DeferredCloser closer, TestCloseable v) throws Exception { 222 assertThat(v).isSameInstanceAs(closeable3); 223 assertStillOpen(closeable1, closeable2, closeable3); 224 return "value"; 225 } 226 }, 227 executor); 228 assertThat(getFinalValue(closingFuture)).isEqualTo("value"); 229 waitUntilClosed(closingFuture); 230 assertClosed(closeable1, closeable2); 231 assertStillOpen(closeable3); 232 } 233 testSubmit_cancelledPipeline()234 public void testSubmit_cancelledPipeline() throws Exception { 235 ClosingFuture<TestCloseable> closingFuture = 236 ClosingFuture.submit( 237 waiter.waitFor( 238 new ClosingCallable<TestCloseable>() { 239 @Override 240 public TestCloseable call(DeferredCloser closer) throws Exception { 241 awaitUninterruptibly(futureCancelled); 242 closer.eventuallyClose(closeable1, closingExecutor); 243 closer.eventuallyClose(closeable2, closingExecutor); 244 return closeable3; 245 } 246 }), 247 executor); 248 waiter.awaitStarted(); 249 cancelFinalStepAndWait(closingFuture); 250 waiter.awaitReturned(); 251 assertClosed(closeable1, closeable2); 252 assertStillOpen(closeable3); 253 } 254 testSubmit_throws()255 public void testSubmit_throws() throws Exception { 256 ClosingFuture<Object> closingFuture = 257 ClosingFuture.submit( 258 new ClosingCallable<Object>() { 259 @Override 260 public Object call(DeferredCloser closer) throws Exception { 261 closer.eventuallyClose(closeable1, closingExecutor); 262 closer.eventuallyClose(closeable2, closingExecutor); 263 throw exception; 264 } 265 }, 266 executor); 267 assertFinallyFailsWithException(closingFuture); 268 waitUntilClosed(closingFuture); 269 assertClosed(closeable1, closeable2); 270 } 271 testSubmitAsync()272 public void testSubmitAsync() throws Exception { 273 ClosingFuture<TestCloseable> closingFuture = 274 ClosingFuture.submitAsync( 275 new AsyncClosingCallable<TestCloseable>() { 276 @Override 277 public ClosingFuture<TestCloseable> call(DeferredCloser closer) { 278 closer.eventuallyClose(closeable1, closingExecutor); 279 return ClosingFuture.submit( 280 new ClosingCallable<TestCloseable>() { 281 @Override 282 public TestCloseable call(DeferredCloser deferredCloser) throws Exception { 283 return closeable2; 284 } 285 }, 286 directExecutor()); 287 } 288 }, 289 executor); 290 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); 291 waitUntilClosed(closingFuture); 292 assertClosed(closeable1); 293 assertStillOpen(closeable2); 294 } 295 testSubmitAsync_cancelledPipeline()296 public void testSubmitAsync_cancelledPipeline() throws Exception { 297 ClosingFuture<TestCloseable> closingFuture = 298 ClosingFuture.submitAsync( 299 waiter.waitFor( 300 new AsyncClosingCallable<TestCloseable>() { 301 @Override 302 public ClosingFuture<TestCloseable> call(DeferredCloser closer) throws Exception { 303 awaitUninterruptibly(futureCancelled); 304 closer.eventuallyClose(closeable1, closingExecutor); 305 closer.eventuallyClose(closeable2, closingExecutor); 306 return ClosingFuture.submit( 307 new ClosingCallable<TestCloseable>() { 308 @Override 309 public TestCloseable call(DeferredCloser deferredCloser) 310 throws Exception { 311 deferredCloser.eventuallyClose(closeable3, closingExecutor); 312 return closeable3; 313 } 314 }, 315 directExecutor()); 316 } 317 }), 318 executor); 319 waiter.awaitStarted(); 320 cancelFinalStepAndWait(closingFuture); 321 waiter.awaitReturned(); 322 assertClosed(closeable1, closeable2, closeable3); 323 } 324 testSubmitAsync_throws()325 public void testSubmitAsync_throws() throws Exception { 326 ClosingFuture<Object> closingFuture = 327 ClosingFuture.submitAsync( 328 new AsyncClosingCallable<Object>() { 329 @Override 330 public ClosingFuture<Object> call(DeferredCloser closer) throws Exception { 331 closer.eventuallyClose(closeable1, closingExecutor); 332 closer.eventuallyClose(closeable2, closingExecutor); 333 throw exception; 334 } 335 }, 336 executor); 337 assertFinallyFailsWithException(closingFuture); 338 waitUntilClosed(closingFuture); 339 assertClosed(closeable1, closeable2); 340 } 341 testAutoCloseable()342 public void testAutoCloseable() throws Exception { 343 AutoCloseable autoCloseable = closeable1::close; 344 ClosingFuture<String> closingFuture = 345 ClosingFuture.submit( 346 new ClosingCallable<String>() { 347 @Override 348 public String call(DeferredCloser closer) throws Exception { 349 closer.eventuallyClose(autoCloseable, closingExecutor); 350 return ""; 351 } 352 }, 353 executor); 354 assertThat(getFinalValue(closingFuture)).isEqualTo(""); 355 waitUntilClosed(closingFuture); 356 assertClosed(closeable1); 357 } 358 testStatusFuture()359 public void testStatusFuture() throws Exception { 360 ClosingFuture<String> closingFuture = 361 ClosingFuture.submit( 362 waiter.waitFor( 363 new ClosingCallable<String>() { 364 @Override 365 public String call(DeferredCloser closer) throws Exception { 366 return "value"; 367 } 368 }), 369 executor); 370 ListenableFuture<?> statusFuture = closingFuture.statusFuture(); 371 waiter.awaitStarted(); 372 assertThat(statusFuture.isDone()).isFalse(); 373 waiter.awaitReturned(); 374 assertThat(getUninterruptibly(statusFuture)).isNull(); 375 } 376 testStatusFuture_failure()377 public void testStatusFuture_failure() throws Exception { 378 ClosingFuture<String> closingFuture = 379 ClosingFuture.submit( 380 waiter.waitFor( 381 new ClosingCallable<String>() { 382 @Override 383 public String call(DeferredCloser closer) throws Exception { 384 throw exception; 385 } 386 }), 387 executor); 388 ListenableFuture<?> statusFuture = closingFuture.statusFuture(); 389 waiter.awaitStarted(); 390 assertThat(statusFuture.isDone()).isFalse(); 391 waiter.awaitReturned(); 392 assertThatFutureFailsWithException(statusFuture); 393 } 394 testStatusFuture_cancelDoesNothing()395 public void testStatusFuture_cancelDoesNothing() throws Exception { 396 ClosingFuture<String> closingFuture = 397 ClosingFuture.submit( 398 waiter.waitFor( 399 new ClosingCallable<String>() { 400 @Override 401 public String call(DeferredCloser closer) throws Exception { 402 return "value"; 403 } 404 }), 405 executor); 406 ListenableFuture<?> statusFuture = closingFuture.statusFuture(); 407 waiter.awaitStarted(); 408 assertThat(statusFuture.isDone()).isFalse(); 409 statusFuture.cancel(true); 410 assertThat(statusFuture.isCancelled()).isTrue(); 411 waiter.awaitReturned(); 412 assertThat(getFinalValue(closingFuture)).isEqualTo("value"); 413 } 414 testCancel_caught()415 public void testCancel_caught() throws Exception { 416 ClosingFuture<String> step0 = ClosingFuture.from(immediateFuture("value 0")); 417 ClosingFuture<String> step1 = 418 step0.transform( 419 new ClosingFunction<String, String>() { 420 @Override 421 public String apply(DeferredCloser closer, String v) throws Exception { 422 closer.eventuallyClose(closeable1, closingExecutor); 423 return "value 1"; 424 } 425 }, 426 executor); 427 Waiter step2Waiter = new Waiter(); 428 ClosingFuture<String> step2 = 429 step1.transform( 430 step2Waiter.waitFor( 431 new ClosingFunction<String, String>() { 432 @Override 433 public String apply(DeferredCloser closer, String v) throws Exception { 434 closer.eventuallyClose(closeable2, closingExecutor); 435 return "value 2"; 436 } 437 }), 438 executor); 439 ClosingFuture<String> step3 = 440 step2.transform( 441 new ClosingFunction<String, String>() { 442 @Override 443 public String apply(DeferredCloser closer, String input) throws Exception { 444 closer.eventuallyClose(closeable3, closingExecutor); 445 return "value 3"; 446 } 447 }, 448 executor); 449 Waiter step4Waiter = new Waiter(); 450 ClosingFuture<String> step4 = 451 step3.catching( 452 CancellationException.class, 453 step4Waiter.waitFor( 454 new ClosingFunction<CancellationException, String>() { 455 @Override 456 public String apply(DeferredCloser closer, CancellationException input) 457 throws Exception { 458 closer.eventuallyClose(closeable4, closingExecutor); 459 return "value 4"; 460 } 461 }), 462 executor); 463 464 // Pause in step 2. 465 step2Waiter.awaitStarted(); 466 467 // Everything should still be open. 468 assertStillOpen(closeable1, closeable2, closeable3, closeable4); 469 470 // Cancel step 3, resume step 2, and pause in step 4. 471 assertWithMessage("step3.cancel()").that(step3.cancel(false)).isTrue(); 472 step2Waiter.awaitReturned(); 473 step4Waiter.awaitStarted(); 474 475 // Step 1 is not cancelled because it was done. 476 assertWithMessage("step1.statusFuture().isCancelled()") 477 .that(step1.statusFuture().isCancelled()) 478 .isFalse(); 479 // But its closeable is closed. 480 assertClosed(closeable1); 481 482 // Step 2 is cancelled because it wasn't complete. 483 assertWithMessage("step2.statusFuture().isCancelled()") 484 .that(step2.statusFuture().isCancelled()) 485 .isTrue(); 486 // Its closeable is closed. 487 assertClosed(closeable2); 488 489 // Step 3 was cancelled before it began 490 assertWithMessage("step3.statusFuture().isCancelled()") 491 .that(step3.statusFuture().isCancelled()) 492 .isTrue(); 493 // Its closeable is still open. 494 assertStillOpen(closeable3); 495 496 // Step 4 is not cancelled, because it caught the cancellation. 497 assertWithMessage("step4.statusFuture().isCancelled()") 498 .that(step4.statusFuture().isCancelled()) 499 .isFalse(); 500 // Its closeable isn't closed yet. 501 assertStillOpen(closeable4); 502 503 // Resume step 4 and complete. 504 step4Waiter.awaitReturned(); 505 assertThat(getFinalValue(step4)).isEqualTo("value 4"); 506 507 // Step 4's closeable is now closed. 508 assertClosed(closeable4); 509 // Step 3 still never ran, so its closeable should still be open. 510 assertStillOpen(closeable3); 511 } 512 testTransform()513 public void testTransform() throws Exception { 514 ClosingFuture<String> closingFuture = 515 ClosingFuture.from(immediateFuture("value")) 516 .transform( 517 new ClosingFunction<String, TestCloseable>() { 518 @Override 519 public TestCloseable apply(DeferredCloser closer, String v) throws Exception { 520 closer.eventuallyClose(closeable1, closingExecutor); 521 closer.eventuallyClose(closeable2, closingExecutor); 522 return closeable3; 523 } 524 }, 525 executor) 526 .transform( 527 new ClosingFunction<TestCloseable, String>() { 528 @Override 529 public String apply(DeferredCloser closer, TestCloseable v) throws Exception { 530 assertThat(v).isSameInstanceAs(closeable3); 531 assertStillOpen(closeable1, closeable2, closeable3); 532 return "value"; 533 } 534 }, 535 executor); 536 assertThat(getFinalValue(closingFuture)).isEqualTo("value"); 537 waitUntilClosed(closingFuture); 538 assertClosed(closeable1, closeable2); 539 assertStillOpen(closeable3); 540 } 541 testTransform_cancelledPipeline()542 public void testTransform_cancelledPipeline() throws Exception { 543 String value = "value"; 544 ClosingFuture<TestCloseable> closingFuture = 545 ClosingFuture.from(immediateFuture(value)) 546 .transform( 547 new ClosingFunction<String, TestCloseable>() { 548 @Override 549 public TestCloseable apply(DeferredCloser closer, String v) throws Exception { 550 return closer.eventuallyClose(closeable1, closingExecutor); 551 } 552 }, 553 executor) 554 .transform( 555 waiter.waitFor( 556 new ClosingFunction<TestCloseable, TestCloseable>() { 557 @Override 558 public TestCloseable apply(DeferredCloser closer, TestCloseable v) 559 throws Exception { 560 awaitUninterruptibly(futureCancelled); 561 closer.eventuallyClose(closeable2, closingExecutor); 562 closer.eventuallyClose(closeable3, closingExecutor); 563 return closeable4; 564 } 565 }), 566 executor); 567 waiter.awaitStarted(); 568 cancelFinalStepAndWait(closingFuture); 569 waiter.awaitReturned(); 570 assertClosed(closeable1, closeable2, closeable3); 571 assertStillOpen(closeable4); 572 } 573 testTransform_throws()574 public void testTransform_throws() throws Exception { 575 ClosingFuture<Object> closingFuture = 576 ClosingFuture.from(immediateFuture("value")) 577 .transform( 578 new ClosingFunction<String, Object>() { 579 @Override 580 public Object apply(DeferredCloser closer, String v) throws Exception { 581 closer.eventuallyClose(closeable1, closingExecutor); 582 closer.eventuallyClose(closeable2, closingExecutor); 583 throw exception; 584 } 585 }, 586 executor); 587 assertFinallyFailsWithException(closingFuture); 588 waitUntilClosed(closingFuture); 589 assertClosed(closeable1, closeable2); 590 } 591 testTransformAsync()592 public void testTransformAsync() throws Exception { 593 ClosingFuture<String> closingFuture = 594 ClosingFuture.from(immediateFuture("value")) 595 .transformAsync( 596 new AsyncClosingFunction<String, TestCloseable>() { 597 @Override 598 public ClosingFuture<TestCloseable> apply(DeferredCloser closer, String v) 599 throws Exception { 600 closer.eventuallyClose(closeable1, closingExecutor); 601 closer.eventuallyClose(closeable2, closingExecutor); 602 return ClosingFuture.eventuallyClosing( 603 immediateFuture(closeable3), closingExecutor); 604 } 605 }, 606 executor) 607 .transform( 608 new ClosingFunction<TestCloseable, String>() { 609 @Override 610 public String apply(DeferredCloser closer, TestCloseable v) throws Exception { 611 assertThat(v).isSameInstanceAs(closeable3); 612 assertStillOpen(closeable1, closeable2, closeable3); 613 return "value"; 614 } 615 }, 616 executor); 617 assertThat(getFinalValue(closingFuture)).isEqualTo("value"); 618 waitUntilClosed(closingFuture); 619 assertClosed(closeable1, closeable2, closeable3); 620 } 621 testTransformAsync_cancelledPipeline()622 public void testTransformAsync_cancelledPipeline() throws Exception { 623 ClosingFuture<TestCloseable> closingFuture = 624 ClosingFuture.from(immediateFuture("value")) 625 .transformAsync( 626 waiter.waitFor( 627 new AsyncClosingFunction<String, TestCloseable>() { 628 @Override 629 public ClosingFuture<TestCloseable> apply(DeferredCloser closer, String v) 630 throws Exception { 631 awaitUninterruptibly(futureCancelled); 632 closer.eventuallyClose(closeable1, closingExecutor); 633 closer.eventuallyClose(closeable2, closingExecutor); 634 return ClosingFuture.eventuallyClosing( 635 immediateFuture(closeable3), closingExecutor); 636 } 637 }), 638 executor); 639 waiter.awaitStarted(); 640 cancelFinalStepAndWait(closingFuture); 641 // not closed until the function returns 642 assertStillOpen(closeable1, closeable2, closeable3); 643 waiter.awaitReturned(); 644 assertClosed(closeable1, closeable2, closeable3); 645 } 646 testTransformAsync_throws()647 public void testTransformAsync_throws() throws Exception { 648 ClosingFuture<Object> closingFuture = 649 ClosingFuture.from(immediateFuture("value")) 650 .transformAsync( 651 new AsyncClosingFunction<String, Object>() { 652 @Override 653 public ClosingFuture<Object> apply(DeferredCloser closer, String v) 654 throws Exception { 655 closer.eventuallyClose(closeable1, closingExecutor); 656 closer.eventuallyClose(closeable2, closingExecutor); 657 throw exception; 658 } 659 }, 660 executor); 661 assertFinallyFailsWithException(closingFuture); 662 waitUntilClosed(closingFuture); 663 assertClosed(closeable1, closeable2); 664 } 665 testTransformAsync_failed()666 public void testTransformAsync_failed() throws Exception { 667 ClosingFuture<Object> closingFuture = 668 ClosingFuture.from(immediateFuture("value")) 669 .transformAsync( 670 new AsyncClosingFunction<String, Object>() { 671 @Override 672 public ClosingFuture<Object> apply(DeferredCloser closer, String v) 673 throws Exception { 674 closer.eventuallyClose(closeable1, closingExecutor); 675 closer.eventuallyClose(closeable2, closingExecutor); 676 return failedClosingFuture(); 677 } 678 }, 679 executor); 680 assertFinallyFailsWithException(closingFuture); 681 waitUntilClosed(closingFuture); 682 assertClosed(closeable1, closeable2); 683 } 684 testTransformAsync_withoutCloser()685 public void testTransformAsync_withoutCloser() throws Exception { 686 ClosingFuture<String> closingFuture = 687 ClosingFuture.submit( 688 new ClosingCallable<TestCloseable>() { 689 @Override 690 public TestCloseable call(DeferredCloser closer) throws Exception { 691 return closer.eventuallyClose(closeable1, closingExecutor); 692 } 693 }, 694 executor) 695 .transformAsync( 696 ClosingFuture.withoutCloser( 697 new AsyncFunction<TestCloseable, String>() { 698 @Override 699 public ListenableFuture<String> apply(TestCloseable v) throws Exception { 700 assertThat(v).isSameInstanceAs(closeable1); 701 assertStillOpen(closeable1); 702 return immediateFuture("value"); 703 } 704 }), 705 executor); 706 assertThat(getFinalValue(closingFuture)).isEqualTo("value"); 707 waitUntilClosed(closingFuture); 708 assertClosed(closeable1); 709 } 710 testWhenAllComplete_call()711 public void testWhenAllComplete_call() throws Exception { 712 final ClosingFuture<String> input1 = ClosingFuture.from(immediateFuture("value1")); 713 final ClosingFuture<Object> input2Failed = failedClosingFuture(); 714 final ClosingFuture<String> nonInput = ClosingFuture.from(immediateFuture("value3")); 715 final AtomicReference<ClosingFuture.Peeker> capturedPeeker = new AtomicReference<>(); 716 ClosingFuture<TestCloseable> closingFuture = 717 ClosingFuture.whenAllComplete(ImmutableList.of(input1, input2Failed)) 718 .call( 719 new CombiningCallable<TestCloseable>() { 720 @Override 721 public TestCloseable call(DeferredCloser closer, Peeker peeker) throws Exception { 722 closer.eventuallyClose(closeable1, closingExecutor); 723 assertThat(peeker.getDone(input1)).isSameInstanceAs("value1"); 724 try { 725 peeker.getDone(input2Failed); 726 fail("Peeker.getDone() should fail for failed inputs"); 727 } catch (ExecutionException expected) { 728 } 729 try { 730 peeker.getDone(nonInput); 731 fail("Peeker should not be able to peek into non-input ClosingFuture."); 732 } catch (IllegalArgumentException expected) { 733 } 734 capturedPeeker.set(peeker); 735 return closeable2; 736 } 737 }, 738 executor); 739 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); 740 waitUntilClosed(closingFuture); 741 assertStillOpen(closeable2); 742 assertClosed(closeable1); 743 assertThrows(IllegalStateException.class, () -> capturedPeeker.get().getDone(input1)); 744 } 745 testWhenAllComplete_call_cancelledPipeline()746 public void testWhenAllComplete_call_cancelledPipeline() throws Exception { 747 ClosingFuture<TestCloseable> closingFuture = 748 ClosingFuture.whenAllComplete( 749 ImmutableList.of( 750 ClosingFuture.from(immediateFuture(closeable1)), 751 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))) 752 .call( 753 waiter.waitFor( 754 new CombiningCallable<TestCloseable>() { 755 @Override 756 public TestCloseable call(DeferredCloser closer, Peeker peeker) 757 throws Exception { 758 awaitUninterruptibly(futureCancelled); 759 closer.eventuallyClose(closeable1, closingExecutor); 760 return closeable3; 761 } 762 }), 763 executor); 764 waiter.awaitStarted(); 765 cancelFinalStepAndWait(closingFuture); 766 waiter.awaitReturned(); 767 assertClosed(closeable1, closeable2); 768 assertStillOpen(closeable3); 769 } 770 testWhenAllComplete_call_throws()771 public void testWhenAllComplete_call_throws() throws Exception { 772 ClosingFuture<Object> closingFuture = 773 ClosingFuture.whenAllComplete( 774 ImmutableList.of( 775 ClosingFuture.from(immediateFuture(closeable1)), 776 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))) 777 .call( 778 new CombiningCallable<Object>() { 779 @Override 780 public Object call(DeferredCloser closer, Peeker peeker) throws Exception { 781 closer.eventuallyClose(closeable3, closingExecutor); 782 throw exception; 783 } 784 }, 785 executor); 786 assertFinallyFailsWithException(closingFuture); 787 waitUntilClosed(closingFuture); 788 assertStillOpen(closeable1); 789 assertClosed(closeable2, closeable3); 790 } 791 testWhenAllComplete_callAsync()792 public void testWhenAllComplete_callAsync() throws Exception { 793 final ClosingFuture<String> input1 = ClosingFuture.from(immediateFuture("value1")); 794 final ClosingFuture<Object> input2Failed = failedClosingFuture(); 795 final ClosingFuture<String> nonInput = ClosingFuture.from(immediateFuture("value3")); 796 final AtomicReference<ClosingFuture.Peeker> capturedPeeker = new AtomicReference<>(); 797 ClosingFuture<TestCloseable> closingFuture = 798 ClosingFuture.whenAllComplete(ImmutableList.of(input1, input2Failed)) 799 .callAsync( 800 new AsyncCombiningCallable<TestCloseable>() { 801 @Override 802 public ClosingFuture<TestCloseable> call(DeferredCloser closer, Peeker peeker) 803 throws Exception { 804 closer.eventuallyClose(closeable1, closingExecutor); 805 assertThat(peeker.getDone(input1)).isSameInstanceAs("value1"); 806 try { 807 peeker.getDone(input2Failed); 808 fail("Peeker should fail for failed inputs"); 809 } catch (ExecutionException expected) { 810 } 811 try { 812 peeker.getDone(nonInput); 813 fail("Peeker should not be able to peek into non-input ClosingFuture."); 814 } catch (IllegalArgumentException expected) { 815 } 816 capturedPeeker.set(peeker); 817 return ClosingFuture.eventuallyClosing( 818 immediateFuture(closeable2), closingExecutor); 819 } 820 }, 821 executor); 822 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); 823 waitUntilClosed(closingFuture); 824 assertClosed(closeable1, closeable2); 825 assertThrows(IllegalStateException.class, () -> capturedPeeker.get().getDone(input1)); 826 } 827 testWhenAllComplete_callAsync_cancelledPipeline()828 public void testWhenAllComplete_callAsync_cancelledPipeline() throws Exception { 829 ClosingFuture<TestCloseable> closingFuture = 830 ClosingFuture.whenAllComplete( 831 ImmutableList.of( 832 ClosingFuture.from(immediateFuture(closeable1)), 833 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))) 834 .callAsync( 835 waiter.waitFor( 836 new AsyncCombiningCallable<TestCloseable>() { 837 @Override 838 public ClosingFuture<TestCloseable> call(DeferredCloser closer, Peeker peeker) 839 throws Exception { 840 awaitUninterruptibly(futureCancelled); 841 closer.eventuallyClose(closeable1, closingExecutor); 842 return ClosingFuture.eventuallyClosing( 843 immediateFuture(closeable3), closingExecutor); 844 } 845 }), 846 executor); 847 waiter.awaitStarted(); 848 cancelFinalStepAndWait(closingFuture); 849 waiter.awaitReturned(); 850 assertClosed(closeable1, closeable2, closeable3); 851 } 852 testWhenAllComplete_callAsync_throws()853 public void testWhenAllComplete_callAsync_throws() throws Exception { 854 ClosingFuture<Object> closingFuture = 855 ClosingFuture.whenAllComplete( 856 ImmutableList.of( 857 ClosingFuture.from(immediateFuture(closeable1)), 858 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))) 859 .callAsync( 860 new AsyncCombiningCallable<Object>() { 861 @Override 862 public ClosingFuture<Object> call(DeferredCloser closer, Peeker peeker) 863 throws Exception { 864 closer.eventuallyClose(closeable3, closingExecutor); 865 throw exception; 866 } 867 }, 868 executor); 869 assertFinallyFailsWithException(closingFuture); 870 waitUntilClosed(closingFuture); 871 assertStillOpen(closeable1); 872 assertClosed(closeable2, closeable3); 873 } 874 875 // We don't need to test the happy case for SuccessfulCombiner.call(Async) because it's the same 876 // as Combiner. 877 testWhenAllSucceed_call_failedInput()878 public void testWhenAllSucceed_call_failedInput() throws Exception { 879 assertFinallyFailsWithException( 880 ClosingFuture.whenAllSucceed( 881 ImmutableList.of( 882 ClosingFuture.from(immediateFuture("value")), failedClosingFuture())) 883 .call( 884 new CombiningCallable<Object>() { 885 @Override 886 public Object call(DeferredCloser closer, Peeker peeker) throws Exception { 887 expect.fail(); 888 throw new AssertionError(); 889 } 890 }, 891 executor)); 892 } 893 testWhenAllSucceed_callAsync_failedInput()894 public void testWhenAllSucceed_callAsync_failedInput() throws Exception { 895 assertFinallyFailsWithException( 896 ClosingFuture.whenAllSucceed( 897 ImmutableList.of( 898 ClosingFuture.from(immediateFuture("value")), failedClosingFuture())) 899 .callAsync( 900 new AsyncCombiningCallable<Object>() { 901 @Override 902 public ClosingFuture<Object> call(DeferredCloser closer, Peeker peeker) 903 throws Exception { 904 expect.fail(); 905 throw new AssertionError(); 906 } 907 }, 908 executor)); 909 } 910 testWhenAllSucceed2_call()911 public void testWhenAllSucceed2_call() throws ExecutionException, IOException { 912 ClosingFuture<TestCloseable> closingFuture = 913 ClosingFuture.whenAllSucceed( 914 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 915 ClosingFuture.from(immediateFuture("value1"))) 916 .call( 917 new ClosingFunction2<TestCloseable, String, TestCloseable>() { 918 @Override 919 public TestCloseable apply(DeferredCloser closer, TestCloseable v1, String v2) 920 throws Exception { 921 assertThat(v1).isEqualTo(closeable1); 922 assertThat(v2).isEqualTo("value1"); 923 assertStillOpen(closeable1); 924 closer.eventuallyClose(closeable2, closingExecutor); 925 return closeable2; 926 } 927 }, 928 executor); 929 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); 930 waitUntilClosed(closingFuture); 931 assertClosed(closeable1, closeable2); 932 } 933 testWhenAllSucceed2_call_failedInput()934 public void testWhenAllSucceed2_call_failedInput() throws ExecutionException, IOException { 935 ClosingFuture<Object> closingFuture = 936 ClosingFuture.whenAllSucceed( 937 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 938 failedClosingFuture()) 939 .call( 940 new ClosingFunction2<TestCloseable, Object, Object>() { 941 @Override 942 public Object apply(DeferredCloser closer, TestCloseable v1, Object v2) 943 throws Exception { 944 expect.fail(); 945 throw new AssertionError(); 946 } 947 }, 948 executor); 949 assertFinallyFailsWithException(closingFuture); 950 waitUntilClosed(closingFuture); 951 assertClosed(closeable1); 952 } 953 testWhenAllSucceed2_call_cancelledPipeline()954 public void testWhenAllSucceed2_call_cancelledPipeline() throws Exception { 955 ClosingFuture<TestCloseable> closingFuture = 956 ClosingFuture.whenAllSucceed( 957 ClosingFuture.from(immediateFuture(closeable1)), 958 ClosingFuture.from(immediateFuture(closeable2))) 959 .call( 960 waiter.waitFor( 961 new ClosingFunction2<TestCloseable, TestCloseable, TestCloseable>() { 962 @Override 963 public TestCloseable apply( 964 DeferredCloser closer, TestCloseable v1, TestCloseable v2) 965 throws Exception { 966 awaitUninterruptibly(futureCancelled); 967 closer.eventuallyClose(closeable1, closingExecutor); 968 closer.eventuallyClose(closeable2, closingExecutor); 969 return closeable3; 970 } 971 }), 972 executor); 973 waiter.awaitStarted(); 974 cancelFinalStepAndWait(closingFuture); 975 // not closed until the function returns 976 assertStillOpen(closeable1, closeable2); 977 waiter.awaitReturned(); 978 assertClosed(closeable1, closeable2); 979 assertStillOpen(closeable3); 980 } 981 testWhenAllSucceed2_call_throws()982 public void testWhenAllSucceed2_call_throws() throws Exception { 983 ClosingFuture<Object> closingFuture = 984 ClosingFuture.whenAllSucceed( 985 ClosingFuture.from(immediateFuture(closeable1)), 986 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor)) 987 .call( 988 new ClosingFunction2<TestCloseable, TestCloseable, Object>() { 989 @Override 990 public Object apply(DeferredCloser closer, TestCloseable v1, TestCloseable v2) 991 throws Exception { 992 closer.eventuallyClose(closeable3, closingExecutor); 993 throw exception; 994 } 995 }, 996 executor); 997 assertFinallyFailsWithException(closingFuture); 998 waitUntilClosed(closingFuture); 999 assertStillOpen(closeable1); 1000 assertClosed(closeable2, closeable3); 1001 } 1002 testWhenAllSucceed2_callAsync()1003 public void testWhenAllSucceed2_callAsync() throws ExecutionException, IOException { 1004 ClosingFuture<TestCloseable> closingFuture = 1005 ClosingFuture.whenAllSucceed( 1006 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 1007 ClosingFuture.from(immediateFuture("value1"))) 1008 .callAsync( 1009 new AsyncClosingFunction2<TestCloseable, String, TestCloseable>() { 1010 @Override 1011 public ClosingFuture<TestCloseable> apply( 1012 DeferredCloser closer, TestCloseable v1, String v2) throws Exception { 1013 assertThat(v1).isEqualTo(closeable1); 1014 assertThat(v2).isEqualTo("value1"); 1015 assertStillOpen(closeable1); 1016 closer.eventuallyClose(closeable2, closingExecutor); 1017 return ClosingFuture.eventuallyClosing( 1018 immediateFuture(closeable3), closingExecutor); 1019 } 1020 }, 1021 executor); 1022 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable3); 1023 waitUntilClosed(closingFuture); 1024 assertClosed(closeable1, closeable2, closeable3); 1025 } 1026 testWhenAllSucceed2_callAsync_failedInput()1027 public void testWhenAllSucceed2_callAsync_failedInput() throws ExecutionException, IOException { 1028 ClosingFuture<Object> closingFuture = 1029 ClosingFuture.whenAllSucceed( 1030 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 1031 failedClosingFuture()) 1032 .callAsync( 1033 new AsyncClosingFunction2<TestCloseable, Object, Object>() { 1034 @Override 1035 public ClosingFuture<Object> apply( 1036 DeferredCloser closer, TestCloseable v1, Object v2) throws Exception { 1037 expect.fail(); 1038 throw new AssertionError(); 1039 } 1040 }, 1041 executor); 1042 assertFinallyFailsWithException(closingFuture); 1043 waitUntilClosed(closingFuture); 1044 assertClosed(closeable1); 1045 } 1046 testWhenAllSucceed2_callAsync_cancelledPipeline()1047 public void testWhenAllSucceed2_callAsync_cancelledPipeline() throws Exception { 1048 ClosingFuture<TestCloseable> closingFuture = 1049 ClosingFuture.whenAllSucceed( 1050 ClosingFuture.from(immediateFuture(closeable1)), 1051 ClosingFuture.from(immediateFuture(closeable2))) 1052 .callAsync( 1053 waiter.waitFor( 1054 new AsyncClosingFunction2<TestCloseable, TestCloseable, TestCloseable>() { 1055 @Override 1056 public ClosingFuture<TestCloseable> apply( 1057 DeferredCloser closer, TestCloseable v1, TestCloseable v2) 1058 throws Exception { 1059 awaitUninterruptibly(futureCancelled); 1060 closer.eventuallyClose(closeable1, closingExecutor); 1061 closer.eventuallyClose(closeable2, closingExecutor); 1062 return ClosingFuture.eventuallyClosing( 1063 immediateFuture(closeable3), closingExecutor); 1064 } 1065 }), 1066 executor); 1067 waiter.awaitStarted(); 1068 cancelFinalStepAndWait(closingFuture); 1069 // not closed until the function returns 1070 assertStillOpen(closeable1, closeable2, closeable3); 1071 waiter.awaitReturned(); 1072 assertClosed(closeable1, closeable2, closeable3); 1073 } 1074 testWhenAllSucceed2_callAsync_throws()1075 public void testWhenAllSucceed2_callAsync_throws() throws Exception { 1076 ClosingFuture<Object> closingFuture = 1077 ClosingFuture.whenAllSucceed( 1078 ClosingFuture.from(immediateFuture(closeable1)), 1079 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor)) 1080 .callAsync( 1081 new AsyncClosingFunction2<TestCloseable, TestCloseable, Object>() { 1082 @Override 1083 public ClosingFuture<Object> apply( 1084 DeferredCloser closer, TestCloseable v1, TestCloseable v2) throws Exception { 1085 closer.eventuallyClose(closeable3, closingExecutor); 1086 throw exception; 1087 } 1088 }, 1089 executor); 1090 assertFinallyFailsWithException(closingFuture); 1091 waitUntilClosed(closingFuture); 1092 assertStillOpen(closeable1); 1093 assertClosed(closeable2, closeable3); 1094 } 1095 testWhenAllSucceed3_call()1096 public void testWhenAllSucceed3_call() throws ExecutionException, IOException { 1097 ClosingFuture<TestCloseable> closingFuture = 1098 ClosingFuture.whenAllSucceed( 1099 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 1100 ClosingFuture.from(immediateFuture("value2")), 1101 ClosingFuture.from(immediateFuture("value3"))) 1102 .call( 1103 new ClosingFunction3<TestCloseable, String, String, TestCloseable>() { 1104 @Override 1105 public TestCloseable apply( 1106 DeferredCloser closer, TestCloseable v1, String v2, String v3) 1107 throws Exception { 1108 assertThat(v1).isEqualTo(closeable1); 1109 assertThat(v2).isEqualTo("value2"); 1110 assertThat(v3).isEqualTo("value3"); 1111 assertStillOpen(closeable1); 1112 closer.eventuallyClose(closeable2, closingExecutor); 1113 return closeable2; 1114 } 1115 }, 1116 executor); 1117 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); 1118 waitUntilClosed(closingFuture); 1119 assertClosed(closeable1, closeable2); 1120 } 1121 testWhenAllSucceed3_call_failedInput()1122 public void testWhenAllSucceed3_call_failedInput() throws ExecutionException, IOException { 1123 ClosingFuture<Object> closingFuture = 1124 ClosingFuture.whenAllSucceed( 1125 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 1126 failedClosingFuture(), 1127 ClosingFuture.from(immediateFuture("value3"))) 1128 .call( 1129 new ClosingFunction3<TestCloseable, Object, String, Object>() { 1130 @Override 1131 public Object apply(DeferredCloser closer, TestCloseable v1, Object v2, String v3) 1132 throws Exception { 1133 expect.fail(); 1134 throw new AssertionError(); 1135 } 1136 }, 1137 executor); 1138 assertFinallyFailsWithException(closingFuture); 1139 waitUntilClosed(closingFuture); 1140 assertClosed(closeable1); 1141 } 1142 testWhenAllSucceed3_call_cancelledPipeline()1143 public void testWhenAllSucceed3_call_cancelledPipeline() throws Exception { 1144 ClosingFuture<TestCloseable> closingFuture = 1145 ClosingFuture.whenAllSucceed( 1146 ClosingFuture.from(immediateFuture(closeable1)), 1147 ClosingFuture.from(immediateFuture(closeable2)), 1148 ClosingFuture.from(immediateFuture("value3"))) 1149 .call( 1150 waiter.waitFor( 1151 new ClosingFunction3<TestCloseable, TestCloseable, String, TestCloseable>() { 1152 @Override 1153 public TestCloseable apply( 1154 DeferredCloser closer, TestCloseable v1, TestCloseable v2, String v3) 1155 throws Exception { 1156 awaitUninterruptibly(futureCancelled); 1157 closer.eventuallyClose(closeable1, closingExecutor); 1158 closer.eventuallyClose(closeable2, closingExecutor); 1159 return closeable3; 1160 } 1161 }), 1162 executor); 1163 waiter.awaitStarted(); 1164 cancelFinalStepAndWait(closingFuture); 1165 // not closed until the function returns 1166 assertStillOpen(closeable1, closeable2); 1167 waiter.awaitReturned(); 1168 assertClosed(closeable1, closeable2); 1169 assertStillOpen(closeable3); 1170 } 1171 testWhenAllSucceed3_call_throws()1172 public void testWhenAllSucceed3_call_throws() throws Exception { 1173 ClosingFuture<Object> closingFuture = 1174 ClosingFuture.whenAllSucceed( 1175 ClosingFuture.from(immediateFuture(closeable1)), 1176 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor), 1177 ClosingFuture.from(immediateFuture("value3"))) 1178 .call( 1179 new ClosingFunction3<TestCloseable, TestCloseable, String, Object>() { 1180 @Override 1181 public Object apply( 1182 DeferredCloser closer, TestCloseable v1, TestCloseable v2, String v3) 1183 throws Exception { 1184 closer.eventuallyClose(closeable3, closingExecutor); 1185 throw exception; 1186 } 1187 }, 1188 executor); 1189 assertFinallyFailsWithException(closingFuture); 1190 waitUntilClosed(closingFuture); 1191 assertStillOpen(closeable1); 1192 assertClosed(closeable2, closeable3); 1193 } 1194 testWhenAllSucceed4_call()1195 public void testWhenAllSucceed4_call() throws ExecutionException, IOException { 1196 ClosingFuture<TestCloseable> closingFuture = 1197 ClosingFuture.whenAllSucceed( 1198 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 1199 ClosingFuture.from(immediateFuture("value2")), 1200 ClosingFuture.from(immediateFuture("value3")), 1201 ClosingFuture.from(immediateFuture("value4"))) 1202 .call( 1203 new ClosingFunction4<TestCloseable, String, String, String, TestCloseable>() { 1204 @Override 1205 public TestCloseable apply( 1206 DeferredCloser closer, TestCloseable v1, String v2, String v3, String v4) 1207 throws Exception { 1208 assertThat(v1).isEqualTo(closeable1); 1209 assertThat(v2).isEqualTo("value2"); 1210 assertThat(v3).isEqualTo("value3"); 1211 assertThat(v4).isEqualTo("value4"); 1212 assertStillOpen(closeable1); 1213 closer.eventuallyClose(closeable2, closingExecutor); 1214 return closeable2; 1215 } 1216 }, 1217 executor); 1218 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); 1219 waitUntilClosed(closingFuture); 1220 assertClosed(closeable1, closeable2); 1221 } 1222 testWhenAllSucceed4_call_failedInput()1223 public void testWhenAllSucceed4_call_failedInput() throws ExecutionException, IOException { 1224 ClosingFuture<Object> closingFuture = 1225 ClosingFuture.whenAllSucceed( 1226 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 1227 failedClosingFuture(), 1228 ClosingFuture.from(immediateFuture("value3")), 1229 ClosingFuture.from(immediateFuture("value4"))) 1230 .call( 1231 new ClosingFunction4<TestCloseable, Object, String, String, Object>() { 1232 @Override 1233 public Object apply( 1234 DeferredCloser closer, TestCloseable v1, Object v2, String v3, String v4) 1235 throws Exception { 1236 expect.fail(); 1237 throw new AssertionError(); 1238 } 1239 }, 1240 executor); 1241 assertFinallyFailsWithException(closingFuture); 1242 waitUntilClosed(closingFuture); 1243 assertClosed(closeable1); 1244 } 1245 testWhenAllSucceed4_call_cancelledPipeline()1246 public void testWhenAllSucceed4_call_cancelledPipeline() throws Exception { 1247 ClosingFuture<TestCloseable> closingFuture = 1248 ClosingFuture.whenAllSucceed( 1249 ClosingFuture.from(immediateFuture(closeable1)), 1250 ClosingFuture.from(immediateFuture(closeable2)), 1251 ClosingFuture.from(immediateFuture("value3")), 1252 ClosingFuture.from(immediateFuture("value4"))) 1253 .call( 1254 waiter.waitFor( 1255 new ClosingFunction4< 1256 TestCloseable, TestCloseable, String, String, TestCloseable>() { 1257 @Override 1258 public TestCloseable apply( 1259 DeferredCloser closer, 1260 TestCloseable v1, 1261 TestCloseable v2, 1262 String v3, 1263 String v4) 1264 throws Exception { 1265 awaitUninterruptibly(futureCancelled); 1266 closer.eventuallyClose(closeable1, closingExecutor); 1267 closer.eventuallyClose(closeable2, closingExecutor); 1268 return closeable3; 1269 } 1270 }), 1271 executor); 1272 waiter.awaitStarted(); 1273 cancelFinalStepAndWait(closingFuture); 1274 // not closed until the function returns 1275 assertStillOpen(closeable1, closeable2); 1276 waiter.awaitReturned(); 1277 assertClosed(closeable1, closeable2); 1278 assertStillOpen(closeable3); 1279 } 1280 testWhenAllSucceed4_call_throws()1281 public void testWhenAllSucceed4_call_throws() throws Exception { 1282 ClosingFuture<Object> closingFuture = 1283 ClosingFuture.whenAllSucceed( 1284 ClosingFuture.from(immediateFuture(closeable1)), 1285 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor), 1286 ClosingFuture.from(immediateFuture("value3")), 1287 ClosingFuture.from(immediateFuture("value4"))) 1288 .call( 1289 new ClosingFunction4<TestCloseable, TestCloseable, String, String, Object>() { 1290 @Override 1291 public Object apply( 1292 DeferredCloser closer, 1293 TestCloseable v1, 1294 TestCloseable v2, 1295 String v3, 1296 String v4) 1297 throws Exception { 1298 closer.eventuallyClose(closeable3, closingExecutor); 1299 throw exception; 1300 } 1301 }, 1302 executor); 1303 assertFinallyFailsWithException(closingFuture); 1304 waitUntilClosed(closingFuture); 1305 assertStillOpen(closeable1); 1306 assertClosed(closeable2, closeable3); 1307 } 1308 testWhenAllSucceed5_call()1309 public void testWhenAllSucceed5_call() throws ExecutionException, IOException { 1310 ClosingFuture<TestCloseable> closingFuture = 1311 ClosingFuture.whenAllSucceed( 1312 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 1313 ClosingFuture.from(immediateFuture("value2")), 1314 ClosingFuture.from(immediateFuture("value3")), 1315 ClosingFuture.from(immediateFuture("value4")), 1316 ClosingFuture.from(immediateFuture("value5"))) 1317 .call( 1318 new ClosingFunction5< 1319 TestCloseable, String, String, String, String, TestCloseable>() { 1320 @Override 1321 public TestCloseable apply( 1322 DeferredCloser closer, 1323 TestCloseable v1, 1324 String v2, 1325 String v3, 1326 String v4, 1327 String v5) 1328 throws Exception { 1329 assertThat(v1).isEqualTo(closeable1); 1330 assertThat(v2).isEqualTo("value2"); 1331 assertThat(v3).isEqualTo("value3"); 1332 assertThat(v4).isEqualTo("value4"); 1333 assertThat(v5).isEqualTo("value5"); 1334 assertStillOpen(closeable1); 1335 closer.eventuallyClose(closeable2, closingExecutor); 1336 return closeable2; 1337 } 1338 }, 1339 executor); 1340 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); 1341 waitUntilClosed(closingFuture); 1342 assertClosed(closeable1, closeable2); 1343 } 1344 testWhenAllSucceed5_call_failedInput()1345 public void testWhenAllSucceed5_call_failedInput() throws ExecutionException, IOException { 1346 ClosingFuture<Object> closingFuture = 1347 ClosingFuture.whenAllSucceed( 1348 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 1349 failedClosingFuture(), 1350 ClosingFuture.from(immediateFuture("value3")), 1351 ClosingFuture.from(immediateFuture("value4")), 1352 ClosingFuture.from(immediateFuture("value5"))) 1353 .call( 1354 new ClosingFunction5<TestCloseable, Object, String, String, String, Object>() { 1355 @Override 1356 public Object apply( 1357 DeferredCloser closer, 1358 TestCloseable v1, 1359 Object v2, 1360 String v3, 1361 String v4, 1362 String v5) 1363 throws Exception { 1364 expect.fail(); 1365 throw new AssertionError(); 1366 } 1367 }, 1368 executor); 1369 assertFinallyFailsWithException(closingFuture); 1370 waitUntilClosed(closingFuture); 1371 assertClosed(closeable1); 1372 } 1373 testWhenAllSucceed5_call_cancelledPipeline()1374 public void testWhenAllSucceed5_call_cancelledPipeline() throws Exception { 1375 ClosingFuture<TestCloseable> closingFuture = 1376 ClosingFuture.whenAllSucceed( 1377 ClosingFuture.from(immediateFuture(closeable1)), 1378 ClosingFuture.from(immediateFuture(closeable2)), 1379 ClosingFuture.from(immediateFuture("value3")), 1380 ClosingFuture.from(immediateFuture("value4")), 1381 ClosingFuture.from(immediateFuture("value5"))) 1382 .call( 1383 waiter.waitFor( 1384 new ClosingFunction5< 1385 TestCloseable, TestCloseable, String, String, String, TestCloseable>() { 1386 @Override 1387 public TestCloseable apply( 1388 DeferredCloser closer, 1389 TestCloseable v1, 1390 TestCloseable v2, 1391 String v3, 1392 String v4, 1393 String v5) 1394 throws Exception { 1395 awaitUninterruptibly(futureCancelled); 1396 closer.eventuallyClose(closeable1, closingExecutor); 1397 closer.eventuallyClose(closeable2, closingExecutor); 1398 return closeable3; 1399 } 1400 }), 1401 executor); 1402 waiter.awaitStarted(); 1403 cancelFinalStepAndWait(closingFuture); 1404 // not closed until the function returns 1405 assertStillOpen(closeable1, closeable2); 1406 waiter.awaitReturned(); 1407 assertClosed(closeable1, closeable2); 1408 assertStillOpen(closeable3); 1409 } 1410 testWhenAllSucceed5_call_throws()1411 public void testWhenAllSucceed5_call_throws() throws Exception { 1412 ClosingFuture<Object> closingFuture = 1413 ClosingFuture.whenAllSucceed( 1414 ClosingFuture.from(immediateFuture(closeable1)), 1415 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor), 1416 ClosingFuture.from(immediateFuture("value3")), 1417 ClosingFuture.from(immediateFuture("value4")), 1418 ClosingFuture.from(immediateFuture("value5"))) 1419 .call( 1420 new ClosingFunction5< 1421 TestCloseable, TestCloseable, String, String, String, Object>() { 1422 @Override 1423 public Object apply( 1424 DeferredCloser closer, 1425 TestCloseable v1, 1426 TestCloseable v2, 1427 String v3, 1428 String v4, 1429 String v5) 1430 throws Exception { 1431 closer.eventuallyClose(closeable3, closingExecutor); 1432 throw exception; 1433 } 1434 }, 1435 executor); 1436 assertFinallyFailsWithException(closingFuture); 1437 waitUntilClosed(closingFuture); 1438 assertStillOpen(closeable1); 1439 assertClosed(closeable2, closeable3); 1440 } 1441 testTransform_preventsFurtherOperations()1442 public void testTransform_preventsFurtherOperations() { 1443 ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); 1444 ClosingFuture<String> unused = 1445 closingFuture.transform( 1446 new ClosingFunction<String, String>() { 1447 @Override 1448 public String apply(DeferredCloser closer, String v) throws Exception { 1449 return "value2"; 1450 } 1451 }, 1452 executor); 1453 assertDerivingThrowsIllegalStateException(closingFuture); 1454 assertFinalStepThrowsIllegalStateException(closingFuture); 1455 } 1456 testTransformAsync_preventsFurtherOperations()1457 public void testTransformAsync_preventsFurtherOperations() { 1458 ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); 1459 ClosingFuture<String> unused = 1460 closingFuture.transformAsync( 1461 new AsyncClosingFunction<String, String>() { 1462 @Override 1463 public ClosingFuture<String> apply(DeferredCloser closer, String v) throws Exception { 1464 return ClosingFuture.from(immediateFuture("value2")); 1465 } 1466 }, 1467 executor); 1468 assertDerivingThrowsIllegalStateException(closingFuture); 1469 assertFinalStepThrowsIllegalStateException(closingFuture); 1470 } 1471 testCatching_preventsFurtherOperations()1472 public void testCatching_preventsFurtherOperations() { 1473 ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); 1474 ClosingFuture<String> unused = 1475 closingFuture.catching( 1476 Exception.class, 1477 new ClosingFunction<Exception, String>() { 1478 @Override 1479 public String apply(DeferredCloser closer, Exception x) throws Exception { 1480 return "value2"; 1481 } 1482 }, 1483 executor); 1484 assertDerivingThrowsIllegalStateException(closingFuture); 1485 assertFinalStepThrowsIllegalStateException(closingFuture); 1486 } 1487 testCatchingAsync_preventsFurtherOperations()1488 public void testCatchingAsync_preventsFurtherOperations() { 1489 ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); 1490 ClosingFuture<String> unused = 1491 closingFuture.catchingAsync( 1492 Exception.class, 1493 ClosingFuture.withoutCloser( 1494 new AsyncFunction<Exception, String>() { 1495 @Override 1496 public ListenableFuture<String> apply(Exception x) throws Exception { 1497 return immediateFuture("value2"); 1498 } 1499 }), 1500 executor); 1501 assertDerivingThrowsIllegalStateException(closingFuture); 1502 assertFinalStepThrowsIllegalStateException(closingFuture); 1503 } 1504 testWhenAllComplete_preventsFurtherOperations()1505 public void testWhenAllComplete_preventsFurtherOperations() { 1506 ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); 1507 Combiner unused = ClosingFuture.whenAllComplete(asList(closingFuture)); 1508 assertDerivingThrowsIllegalStateException(closingFuture); 1509 assertFinalStepThrowsIllegalStateException(closingFuture); 1510 } 1511 testWhenAllSucceed_preventsFurtherOperations()1512 public void testWhenAllSucceed_preventsFurtherOperations() { 1513 ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); 1514 Combiner unused = ClosingFuture.whenAllSucceed(asList(closingFuture)); 1515 assertDerivingThrowsIllegalStateException(closingFuture); 1516 assertFinalStepThrowsIllegalStateException(closingFuture); 1517 } 1518 assertDerivingThrowsIllegalStateException( ClosingFuture<String> closingFuture)1519 protected final void assertDerivingThrowsIllegalStateException( 1520 ClosingFuture<String> closingFuture) { 1521 try { 1522 closingFuture.transform( 1523 new ClosingFunction<String, String>() { 1524 @Override 1525 public String apply(DeferredCloser closer3, String v1) throws Exception { 1526 return "value3"; 1527 } 1528 }, 1529 executor); 1530 fail(); 1531 } catch (IllegalStateException expected5) { 1532 } 1533 try { 1534 closingFuture.transformAsync( 1535 new AsyncClosingFunction<String, String>() { 1536 @Override 1537 public ClosingFuture<String> apply(DeferredCloser closer2, String v) throws Exception { 1538 return ClosingFuture.from(immediateFuture("value3")); 1539 } 1540 }, 1541 executor); 1542 fail(); 1543 } catch (IllegalStateException expected4) { 1544 } 1545 try { 1546 closingFuture.catching( 1547 Exception.class, 1548 new ClosingFunction<Exception, String>() { 1549 @Override 1550 public String apply(DeferredCloser closer1, Exception x1) throws Exception { 1551 return "value3"; 1552 } 1553 }, 1554 executor); 1555 fail(); 1556 } catch (IllegalStateException expected3) { 1557 } 1558 try { 1559 closingFuture.catchingAsync( 1560 Exception.class, 1561 new AsyncClosingFunction<Exception, String>() { 1562 @Override 1563 public ClosingFuture<String> apply(DeferredCloser closer, Exception x) 1564 throws Exception { 1565 return ClosingFuture.from(immediateFuture("value3")); 1566 } 1567 }, 1568 executor); 1569 fail(); 1570 } catch (IllegalStateException expected2) { 1571 } 1572 try { 1573 ClosingFuture.whenAllComplete(asList(closingFuture)); 1574 fail(); 1575 } catch (IllegalStateException expected1) { 1576 } 1577 try { 1578 ClosingFuture.whenAllSucceed(asList(closingFuture)); 1579 fail(); 1580 } catch (IllegalStateException expected) { 1581 } 1582 } 1583 1584 /** Asserts that marking this step a final step throws {@link IllegalStateException}. */ assertFinalStepThrowsIllegalStateException(ClosingFuture<?> closingFuture)1585 protected void assertFinalStepThrowsIllegalStateException(ClosingFuture<?> closingFuture) { 1586 try { 1587 closingFuture.finishToFuture(); 1588 fail(); 1589 } catch (IllegalStateException expected) { 1590 } 1591 try { 1592 closingFuture.finishToValueAndCloser(new NoOpValueAndCloserConsumer<>(), executor); 1593 fail(); 1594 } catch (IllegalStateException expected) { 1595 } 1596 } 1597 1598 // Avoid infinite recursion if a closeable's close() method throws RejectedExecutionException and 1599 // is closed using the direct executor. testCloseThrowsRejectedExecutionException()1600 public void testCloseThrowsRejectedExecutionException() throws Exception { 1601 doThrow(new RejectedExecutionException()).when(mockCloseable).close(); 1602 ClosingFuture<Closeable> closingFuture = 1603 ClosingFuture.submit( 1604 new ClosingCallable<Closeable>() { 1605 @Override 1606 public Closeable call(DeferredCloser closer) throws Exception { 1607 return closer.eventuallyClose(mockCloseable, directExecutor()); 1608 } 1609 }, 1610 executor); 1611 assertThat(getFinalValue(closingFuture)).isEqualTo(mockCloseable); 1612 waitUntilClosed(closingFuture); 1613 verify(mockCloseable, timeout(1000)).close(); 1614 } 1615 1616 /** 1617 * Marks the given step final, waits for it to be finished, and returns the value. 1618 * 1619 * @throws ExecutionException if the step failed 1620 * @throws CancellationException if the step was cancelled 1621 */ getFinalValue(ClosingFuture<T> closingFuture)1622 abstract <T> T getFinalValue(ClosingFuture<T> closingFuture) throws ExecutionException; 1623 1624 /** Marks the given step final, cancels it, and waits for the cancellation to happen. */ cancelFinalStepAndWait(ClosingFuture<TestCloseable> closingFuture)1625 abstract void cancelFinalStepAndWait(ClosingFuture<TestCloseable> closingFuture); 1626 1627 /** 1628 * Marks the given step final and waits for it to fail. Expects the failure exception to match 1629 * {@link AbstractClosingFutureTest#exception}. 1630 */ assertFinallyFailsWithException(ClosingFuture<?> closingFuture)1631 abstract void assertFinallyFailsWithException(ClosingFuture<?> closingFuture); 1632 1633 /** Waits for the given step to be canceled. */ assertBecomesCanceled(ClosingFuture<?> closingFuture)1634 abstract void assertBecomesCanceled(ClosingFuture<?> closingFuture) throws ExecutionException; 1635 1636 /** Waits for the given step's closeables to be closed. */ waitUntilClosed(ClosingFuture<?> closingFuture)1637 void waitUntilClosed(ClosingFuture<?> closingFuture) { 1638 assertTrue(awaitUninterruptibly(closingFuture.whenClosedCountDown(), 1, SECONDS)); 1639 } 1640 assertThatFutureFailsWithException(Future<?> future)1641 void assertThatFutureFailsWithException(Future<?> future) { 1642 try { 1643 getUninterruptibly(future); 1644 fail("Expected future to fail: " + future); 1645 } catch (ExecutionException e) { 1646 assertThat(e).hasCauseThat().isSameInstanceAs(exception); 1647 } 1648 } 1649 assertThatFutureBecomesCancelled(Future<?> future)1650 static void assertThatFutureBecomesCancelled(Future<?> future) throws ExecutionException { 1651 try { 1652 getUninterruptibly(future); 1653 fail("Expected future to be canceled: " + future); 1654 } catch (CancellationException expected) { 1655 } 1656 } 1657 assertStillOpen(TestCloseable closeable1, TestCloseable... moreCloseables)1658 private static void assertStillOpen(TestCloseable closeable1, TestCloseable... moreCloseables) 1659 throws IOException { 1660 for (TestCloseable closeable : asList(closeable1, moreCloseables)) { 1661 assertWithMessage("%s.stillOpen()", closeable).that(closeable.stillOpen()).isTrue(); 1662 } 1663 } 1664 assertClosed(TestCloseable closeable1, TestCloseable... moreCloseables)1665 static void assertClosed(TestCloseable closeable1, TestCloseable... moreCloseables) 1666 throws IOException { 1667 for (TestCloseable closeable : asList(closeable1, moreCloseables)) { 1668 assertWithMessage("%s.isClosed()", closeable).that(closeable.awaitClosed()).isTrue(); 1669 } 1670 } 1671 failedClosingFuture()1672 private ClosingFuture<Object> failedClosingFuture() { 1673 return ClosingFuture.from(immediateFailedFuture(exception)); 1674 } 1675 assertNoExpectedFailures()1676 private void assertNoExpectedFailures() { 1677 assertWithMessage("executor was shut down") 1678 .that(shutdownAndAwaitTermination(executor, 10, SECONDS)) 1679 .isTrue(); 1680 assertWithMessage("closingExecutor was shut down") 1681 .that(shutdownAndAwaitTermination(closingExecutor, 10, SECONDS)) 1682 .isTrue(); 1683 if (!failures.isEmpty()) { 1684 StringWriter message = new StringWriter(); 1685 PrintWriter writer = new PrintWriter(message); 1686 writer.println("Expected no failures, but found:"); 1687 for (AssertionError failure : failures) { 1688 failure.printStackTrace(writer); 1689 } 1690 failures.clear(); 1691 assertWithMessage(message.toString()).fail(); 1692 } 1693 } 1694 1695 static final class TestCloseable implements Closeable { 1696 private final CountDownLatch latch = new CountDownLatch(1); 1697 private final String name; 1698 TestCloseable(String name)1699 TestCloseable(String name) { 1700 this.name = name; 1701 } 1702 1703 @Override close()1704 public void close() throws IOException { 1705 latch.countDown(); 1706 } 1707 awaitClosed()1708 boolean awaitClosed() { 1709 return awaitUninterruptibly(latch, 10, SECONDS); 1710 } 1711 stillOpen()1712 boolean stillOpen() { 1713 return !awaitUninterruptibly(latch, 1, SECONDS); 1714 } 1715 1716 @Override toString()1717 public String toString() { 1718 return name; 1719 } 1720 } 1721 1722 static final class Waiter { 1723 private final CountDownLatch started = new CountDownLatch(1); 1724 private final CountDownLatch canReturn = new CountDownLatch(1); 1725 private final CountDownLatch returned = new CountDownLatch(1); 1726 private Object proxy; 1727 waitFor(Callable<V> callable)1728 <V> Callable<V> waitFor(Callable<V> callable) { 1729 return waitFor(callable, Callable.class); 1730 } 1731 waitFor(ClosingCallable<V> closingCallable)1732 <V> ClosingCallable<V> waitFor(ClosingCallable<V> closingCallable) { 1733 return waitFor(closingCallable, ClosingCallable.class); 1734 } 1735 waitFor(AsyncClosingCallable<V> asyncClosingCallable)1736 <V> AsyncClosingCallable<V> waitFor(AsyncClosingCallable<V> asyncClosingCallable) { 1737 return waitFor(asyncClosingCallable, AsyncClosingCallable.class); 1738 } 1739 waitFor(ClosingFunction<T, U> closingFunction)1740 <T, U> ClosingFunction<T, U> waitFor(ClosingFunction<T, U> closingFunction) { 1741 return waitFor(closingFunction, ClosingFunction.class); 1742 } 1743 waitFor(AsyncClosingFunction<T, U> asyncClosingFunction)1744 <T, U> AsyncClosingFunction<T, U> waitFor(AsyncClosingFunction<T, U> asyncClosingFunction) { 1745 return waitFor(asyncClosingFunction, AsyncClosingFunction.class); 1746 } 1747 waitFor(CombiningCallable<V> combiningCallable)1748 <V> CombiningCallable<V> waitFor(CombiningCallable<V> combiningCallable) { 1749 return waitFor(combiningCallable, CombiningCallable.class); 1750 } 1751 waitFor(AsyncCombiningCallable<V> asyncCombiningCallable)1752 <V> AsyncCombiningCallable<V> waitFor(AsyncCombiningCallable<V> asyncCombiningCallable) { 1753 return waitFor(asyncCombiningCallable, AsyncCombiningCallable.class); 1754 } 1755 waitFor(ClosingFunction2<V1, V2, U> closingFunction2)1756 <V1, V2, U> ClosingFunction2<V1, V2, U> waitFor(ClosingFunction2<V1, V2, U> closingFunction2) { 1757 return waitFor(closingFunction2, ClosingFunction2.class); 1758 } 1759 waitFor( AsyncClosingFunction2<V1, V2, U> asyncClosingFunction2)1760 <V1, V2, U> AsyncClosingFunction2<V1, V2, U> waitFor( 1761 AsyncClosingFunction2<V1, V2, U> asyncClosingFunction2) { 1762 return waitFor(asyncClosingFunction2, AsyncClosingFunction2.class); 1763 } 1764 waitFor( ClosingFunction3<V1, V2, V3, U> closingFunction3)1765 <V1, V2, V3, U> ClosingFunction3<V1, V2, V3, U> waitFor( 1766 ClosingFunction3<V1, V2, V3, U> closingFunction3) { 1767 return waitFor(closingFunction3, ClosingFunction3.class); 1768 } 1769 waitFor( ClosingFunction4<V1, V2, V3, V4, U> closingFunction4)1770 <V1, V2, V3, V4, U> ClosingFunction4<V1, V2, V3, V4, U> waitFor( 1771 ClosingFunction4<V1, V2, V3, V4, U> closingFunction4) { 1772 return waitFor(closingFunction4, ClosingFunction4.class); 1773 } 1774 waitFor( ClosingFunction5<V1, V2, V3, V4, V5, U> closingFunction5)1775 <V1, V2, V3, V4, V5, U> ClosingFunction5<V1, V2, V3, V4, V5, U> waitFor( 1776 ClosingFunction5<V1, V2, V3, V4, V5, U> closingFunction5) { 1777 return waitFor(closingFunction5, ClosingFunction5.class); 1778 } 1779 waitFor(final T delegate, final Class<T> type)1780 <T> T waitFor(final T delegate, final Class<T> type) { 1781 checkState(proxy == null); 1782 T proxyObject = 1783 Reflection.newProxy( 1784 type, 1785 new InvocationHandler() { 1786 @Override 1787 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 1788 if (!method.getDeclaringClass().equals(type)) { 1789 return method.invoke(delegate, args); 1790 } 1791 checkState(started.getCount() == 1); 1792 started.countDown(); 1793 try { 1794 return method.invoke(delegate, args); 1795 } catch (InvocationTargetException e) { 1796 throw e.getCause(); 1797 } finally { 1798 awaitUninterruptibly(canReturn); 1799 returned.countDown(); 1800 } 1801 } 1802 }); 1803 this.proxy = proxyObject; 1804 return proxyObject; 1805 } 1806 awaitStarted()1807 void awaitStarted() { 1808 assertTrue(awaitUninterruptibly(started, 10, SECONDS)); 1809 } 1810 awaitReturned()1811 void awaitReturned() { 1812 canReturn.countDown(); 1813 assertTrue(awaitUninterruptibly(returned, 10, SECONDS)); 1814 } 1815 } 1816 1817 static final class NoOpValueAndCloserConsumer<V> implements ValueAndCloserConsumer<V> { 1818 @Override accept(ValueAndCloser<V> valueAndCloser)1819 public void accept(ValueAndCloser<V> valueAndCloser) {} 1820 } 1821 } 1822