1 /* 2 * Copyright (C) 2017 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.base.Preconditions.checkState; 20 import static com.google.common.collect.Lists.asList; 21 import static com.google.common.truth.Truth.assertThat; 22 import static com.google.common.truth.Truth.assertWithMessage; 23 import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; 24 import static com.google.common.util.concurrent.Futures.immediateFailedFuture; 25 import static com.google.common.util.concurrent.Futures.immediateFuture; 26 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 27 import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination; 28 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; 29 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 30 import static java.util.Arrays.asList; 31 import static java.util.concurrent.Executors.newSingleThreadExecutor; 32 import static java.util.concurrent.TimeUnit.SECONDS; 33 import static org.junit.Assert.assertThrows; 34 import static org.mockito.Mockito.doThrow; 35 import static org.mockito.Mockito.timeout; 36 import static org.mockito.Mockito.verify; 37 38 import com.google.common.collect.ImmutableList; 39 import com.google.common.reflect.Reflection; 40 import com.google.common.truth.FailureStrategy; 41 import com.google.common.truth.StandardSubjectBuilder; 42 import com.google.common.util.concurrent.ClosingFuture.AsyncClosingCallable; 43 import com.google.common.util.concurrent.ClosingFuture.AsyncClosingFunction; 44 import com.google.common.util.concurrent.ClosingFuture.ClosingCallable; 45 import com.google.common.util.concurrent.ClosingFuture.ClosingFunction; 46 import com.google.common.util.concurrent.ClosingFuture.Combiner; 47 import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable; 48 import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable; 49 import com.google.common.util.concurrent.ClosingFuture.Combiner2.AsyncClosingFunction2; 50 import com.google.common.util.concurrent.ClosingFuture.Combiner2.ClosingFunction2; 51 import com.google.common.util.concurrent.ClosingFuture.Combiner3.ClosingFunction3; 52 import com.google.common.util.concurrent.ClosingFuture.Combiner4.ClosingFunction4; 53 import com.google.common.util.concurrent.ClosingFuture.Combiner5.ClosingFunction5; 54 import com.google.common.util.concurrent.ClosingFuture.DeferredCloser; 55 import com.google.common.util.concurrent.ClosingFuture.Peeker; 56 import com.google.common.util.concurrent.ClosingFuture.ValueAndCloser; 57 import com.google.common.util.concurrent.ClosingFuture.ValueAndCloserConsumer; 58 import java.io.Closeable; 59 import java.io.IOException; 60 import java.io.PrintWriter; 61 import java.io.StringWriter; 62 import java.lang.reflect.InvocationHandler; 63 import java.lang.reflect.InvocationTargetException; 64 import java.lang.reflect.Method; 65 import java.util.ArrayList; 66 import java.util.List; 67 import java.util.concurrent.Callable; 68 import java.util.concurrent.CancellationException; 69 import java.util.concurrent.CountDownLatch; 70 import java.util.concurrent.ExecutionException; 71 import java.util.concurrent.Executor; 72 import java.util.concurrent.ExecutorService; 73 import java.util.concurrent.Future; 74 import java.util.concurrent.RejectedExecutionException; 75 import java.util.concurrent.atomic.AtomicReference; 76 import junit.framework.TestCase; 77 import org.mockito.Mockito; 78 79 /** 80 * Tests for {@link ClosingFuture}. Subclasses exercise either the {@link 81 * ClosingFuture#finishToFuture()} or {@link 82 * ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} paths to complete a 83 * {@link ClosingFuture} pipeline. 84 */ 85 public abstract class AbstractClosingFutureTest extends TestCase { 86 // TODO(dpb): Use Expect once that supports JUnit 3, or we can use JUnit 4. 87 final List<AssertionError> failures = new ArrayList<>(); 88 final StandardSubjectBuilder expect = 89 StandardSubjectBuilder.forCustomFailureStrategy( 90 new FailureStrategy() { 91 @Override 92 public void fail(AssertionError failure) { 93 failures.add(failure); 94 } 95 }); 96 97 final ListeningExecutorService executor = 98 MoreExecutors.listeningDecorator(newSingleThreadExecutor()); 99 final ExecutorService closingExecutor = newSingleThreadExecutor(); 100 101 final TestCloseable closeable1 = new TestCloseable("closeable1"); 102 final TestCloseable closeable2 = new TestCloseable("closeable2"); 103 final TestCloseable closeable3 = new TestCloseable("closeable3"); 104 final TestCloseable closeable4 = new TestCloseable("closeable4"); 105 106 final Waiter waiter = new Waiter(); 107 final CountDownLatch futureCancelled = new CountDownLatch(1); 108 final Exception exception = new Exception(); 109 final Closeable mockCloseable = Mockito.mock(Closeable.class); 110 111 @Override tearDown()112 protected void tearDown() throws Exception { 113 assertNoExpectedFailures(); 114 super.tearDown(); 115 } 116 testFrom()117 public void testFrom() throws Exception { 118 ClosingFuture<String> closingFuture = 119 ClosingFuture.from(executor.submit(Callables.returning(closeable1))) 120 .transform( 121 new ClosingFunction<TestCloseable, String>() { 122 @Override 123 public String apply(DeferredCloser closer, TestCloseable v) throws Exception { 124 assertThat(v).isSameInstanceAs(closeable1); 125 return "value"; 126 } 127 }, 128 executor); 129 assertThat(getFinalValue(closingFuture)).isEqualTo("value"); 130 waitUntilClosed(closingFuture); 131 assertStillOpen(closeable1); 132 } 133 testFrom_failedInput()134 public void testFrom_failedInput() throws Exception { 135 assertFinallyFailsWithException(failedClosingFuture()); 136 } 137 testFrom_cancelledInput()138 public void testFrom_cancelledInput() throws Exception { 139 assertBecomesCanceled(ClosingFuture.from(immediateCancelledFuture())); 140 } 141 testEventuallyClosing()142 public void testEventuallyClosing() throws Exception { 143 ClosingFuture<String> closingFuture = 144 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor) 145 .transform( 146 new ClosingFunction<TestCloseable, String>() { 147 @Override 148 public String apply(DeferredCloser closer, TestCloseable v) throws Exception { 149 assertThat(v).isSameInstanceAs(closeable1); 150 assertStillOpen(closeable1); 151 return "value"; 152 } 153 }, 154 executor); 155 assertThat(getFinalValue(closingFuture)).isEqualTo("value"); 156 waitUntilClosed(closingFuture); 157 assertClosed(closeable1); 158 } 159 testEventuallyClosing_failedInput()160 public void testEventuallyClosing_failedInput() throws Exception { 161 assertFinallyFailsWithException( 162 ClosingFuture.eventuallyClosing( 163 Futures.<Closeable>immediateFailedFuture(exception), closingExecutor)); 164 } 165 testEventuallyClosing_cancelledInput()166 public void testEventuallyClosing_cancelledInput() throws Exception { 167 assertBecomesCanceled( 168 ClosingFuture.eventuallyClosing( 169 Futures.<Closeable>immediateCancelledFuture(), closingExecutor)); 170 } 171 testEventuallyClosing_cancelledPipeline()172 public void testEventuallyClosing_cancelledPipeline() throws Exception { 173 ClosingFuture<TestCloseable> closingFuture = 174 ClosingFuture.eventuallyClosing( 175 executor.submit( 176 waiter.waitFor( 177 new Callable<TestCloseable>() { 178 @Override 179 public TestCloseable call() throws InterruptedException { 180 awaitUninterruptibly(futureCancelled); 181 return closeable1; 182 } 183 })), 184 closingExecutor); 185 waiter.awaitStarted(); 186 cancelFinalStepAndWait(closingFuture); 187 // not closed until the callable returns 188 assertStillOpen(closeable1); 189 waiter.awaitReturned(); 190 assertClosed(closeable1); 191 } 192 testEventuallyClosing_throws()193 public void testEventuallyClosing_throws() throws Exception { 194 assertFinallyFailsWithException( 195 ClosingFuture.eventuallyClosing( 196 executor.submit( 197 new Callable<TestCloseable>() { 198 @Override 199 public TestCloseable call() throws Exception { 200 throw exception; 201 } 202 }), 203 closingExecutor)); 204 } 205 testSubmit()206 public void testSubmit() throws Exception { 207 ClosingFuture<String> closingFuture = 208 ClosingFuture.submit( 209 new ClosingCallable<TestCloseable>() { 210 @Override 211 public TestCloseable call(DeferredCloser closer) throws Exception { 212 closer.eventuallyClose(closeable1, closingExecutor); 213 closer.eventuallyClose(closeable2, closingExecutor); 214 return closeable3; 215 } 216 }, 217 executor) 218 .transform( 219 new ClosingFunction<TestCloseable, String>() { 220 @Override 221 public String apply(DeferredCloser closer, TestCloseable v) throws Exception { 222 assertThat(v).isSameInstanceAs(closeable3); 223 assertStillOpen(closeable1, closeable2, closeable3); 224 return "value"; 225 } 226 }, 227 executor); 228 assertThat(getFinalValue(closingFuture)).isEqualTo("value"); 229 waitUntilClosed(closingFuture); 230 assertClosed(closeable1, closeable2); 231 assertStillOpen(closeable3); 232 } 233 testSubmit_cancelledPipeline()234 public void testSubmit_cancelledPipeline() throws Exception { 235 ClosingFuture<TestCloseable> closingFuture = 236 ClosingFuture.submit( 237 waiter.waitFor( 238 new ClosingCallable<TestCloseable>() { 239 @Override 240 public TestCloseable call(DeferredCloser closer) throws Exception { 241 awaitUninterruptibly(futureCancelled); 242 closer.eventuallyClose(closeable1, closingExecutor); 243 closer.eventuallyClose(closeable2, closingExecutor); 244 return closeable3; 245 } 246 }), 247 executor); 248 waiter.awaitStarted(); 249 cancelFinalStepAndWait(closingFuture); 250 waiter.awaitReturned(); 251 assertClosed(closeable1, closeable2); 252 assertStillOpen(closeable3); 253 } 254 testSubmit_throws()255 public void testSubmit_throws() throws Exception { 256 ClosingFuture<Object> closingFuture = 257 ClosingFuture.submit( 258 new ClosingCallable<Object>() { 259 @Override 260 public Object call(DeferredCloser closer) throws Exception { 261 closer.eventuallyClose(closeable1, closingExecutor); 262 closer.eventuallyClose(closeable2, closingExecutor); 263 throw exception; 264 } 265 }, 266 executor); 267 assertFinallyFailsWithException(closingFuture); 268 waitUntilClosed(closingFuture); 269 assertClosed(closeable1, closeable2); 270 } 271 testSubmitAsync()272 public void testSubmitAsync() throws Exception { 273 ClosingFuture<TestCloseable> closingFuture = 274 ClosingFuture.submitAsync( 275 new AsyncClosingCallable<TestCloseable>() { 276 @Override 277 public ClosingFuture<TestCloseable> call(DeferredCloser closer) { 278 closer.eventuallyClose(closeable1, closingExecutor); 279 return ClosingFuture.submit( 280 new ClosingCallable<TestCloseable>() { 281 @Override 282 public TestCloseable call(DeferredCloser deferredCloser) throws Exception { 283 return closeable2; 284 } 285 }, 286 directExecutor()); 287 } 288 }, 289 executor); 290 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); 291 waitUntilClosed(closingFuture); 292 assertClosed(closeable1); 293 assertStillOpen(closeable2); 294 } 295 testSubmitAsync_cancelledPipeline()296 public void testSubmitAsync_cancelledPipeline() throws Exception { 297 ClosingFuture<TestCloseable> closingFuture = 298 ClosingFuture.submitAsync( 299 waiter.waitFor( 300 new AsyncClosingCallable<TestCloseable>() { 301 @Override 302 public ClosingFuture<TestCloseable> call(DeferredCloser closer) throws Exception { 303 awaitUninterruptibly(futureCancelled); 304 closer.eventuallyClose(closeable1, closingExecutor); 305 closer.eventuallyClose(closeable2, closingExecutor); 306 return ClosingFuture.submit( 307 new ClosingCallable<TestCloseable>() { 308 @Override 309 public TestCloseable call(DeferredCloser deferredCloser) 310 throws Exception { 311 deferredCloser.eventuallyClose(closeable3, closingExecutor); 312 return closeable3; 313 } 314 }, 315 directExecutor()); 316 } 317 }), 318 executor); 319 waiter.awaitStarted(); 320 cancelFinalStepAndWait(closingFuture); 321 waiter.awaitReturned(); 322 assertClosed(closeable1, closeable2, closeable3); 323 } 324 testSubmitAsync_throws()325 public void testSubmitAsync_throws() throws Exception { 326 ClosingFuture<Object> closingFuture = 327 ClosingFuture.submitAsync( 328 new AsyncClosingCallable<Object>() { 329 @Override 330 public ClosingFuture<Object> call(DeferredCloser closer) throws Exception { 331 closer.eventuallyClose(closeable1, closingExecutor); 332 closer.eventuallyClose(closeable2, closingExecutor); 333 throw exception; 334 } 335 }, 336 executor); 337 assertFinallyFailsWithException(closingFuture); 338 waitUntilClosed(closingFuture); 339 assertClosed(closeable1, closeable2); 340 } 341 testStatusFuture()342 public void testStatusFuture() throws Exception { 343 ClosingFuture<String> closingFuture = 344 ClosingFuture.submit( 345 waiter.waitFor( 346 new ClosingCallable<String>() { 347 @Override 348 public String call(DeferredCloser closer) throws Exception { 349 return "value"; 350 } 351 }), 352 executor); 353 ListenableFuture<?> statusFuture = closingFuture.statusFuture(); 354 waiter.awaitStarted(); 355 assertThat(statusFuture.isDone()).isFalse(); 356 waiter.awaitReturned(); 357 assertThat(getUninterruptibly(statusFuture)).isNull(); 358 } 359 testStatusFuture_failure()360 public void testStatusFuture_failure() throws Exception { 361 ClosingFuture<String> closingFuture = 362 ClosingFuture.submit( 363 waiter.waitFor( 364 new ClosingCallable<String>() { 365 @Override 366 public String call(DeferredCloser closer) throws Exception { 367 throw exception; 368 } 369 }), 370 executor); 371 ListenableFuture<?> statusFuture = closingFuture.statusFuture(); 372 waiter.awaitStarted(); 373 assertThat(statusFuture.isDone()).isFalse(); 374 waiter.awaitReturned(); 375 assertThatFutureFailsWithException(statusFuture); 376 } 377 testStatusFuture_cancelDoesNothing()378 public void testStatusFuture_cancelDoesNothing() throws Exception { 379 ClosingFuture<String> closingFuture = 380 ClosingFuture.submit( 381 waiter.waitFor( 382 new ClosingCallable<String>() { 383 @Override 384 public String call(DeferredCloser closer) throws Exception { 385 return "value"; 386 } 387 }), 388 executor); 389 ListenableFuture<?> statusFuture = closingFuture.statusFuture(); 390 waiter.awaitStarted(); 391 assertThat(statusFuture.isDone()).isFalse(); 392 statusFuture.cancel(true); 393 assertThat(statusFuture.isCancelled()).isTrue(); 394 waiter.awaitReturned(); 395 assertThat(getFinalValue(closingFuture)).isEqualTo("value"); 396 } 397 testCancel_caught()398 public void testCancel_caught() throws Exception { 399 ClosingFuture<String> step0 = ClosingFuture.from(immediateFuture("value 0")); 400 ClosingFuture<String> step1 = 401 step0.transform( 402 new ClosingFunction<String, String>() { 403 @Override 404 public String apply(DeferredCloser closer, String v) throws Exception { 405 closer.eventuallyClose(closeable1, closingExecutor); 406 return "value 1"; 407 } 408 }, 409 executor); 410 Waiter step2Waiter = new Waiter(); 411 ClosingFuture<String> step2 = 412 step1.transform( 413 step2Waiter.waitFor( 414 new ClosingFunction<String, String>() { 415 @Override 416 public String apply(DeferredCloser closer, String v) throws Exception { 417 closer.eventuallyClose(closeable2, closingExecutor); 418 return "value 2"; 419 } 420 }), 421 executor); 422 ClosingFuture<String> step3 = 423 step2.transform( 424 new ClosingFunction<String, String>() { 425 @Override 426 public String apply(DeferredCloser closer, String input) throws Exception { 427 closer.eventuallyClose(closeable3, closingExecutor); 428 return "value 3"; 429 } 430 }, 431 executor); 432 Waiter step4Waiter = new Waiter(); 433 ClosingFuture<String> step4 = 434 step3.catching( 435 CancellationException.class, 436 step4Waiter.waitFor( 437 new ClosingFunction<CancellationException, String>() { 438 @Override 439 public String apply(DeferredCloser closer, CancellationException input) 440 throws Exception { 441 closer.eventuallyClose(closeable4, closingExecutor); 442 return "value 4"; 443 } 444 }), 445 executor); 446 447 // Pause in step 2. 448 step2Waiter.awaitStarted(); 449 450 // Everything should still be open. 451 assertStillOpen(closeable1, closeable2, closeable3, closeable4); 452 453 // Cancel step 3, resume step 2, and pause in step 4. 454 assertWithMessage("step3.cancel()").that(step3.cancel(false)).isTrue(); 455 step2Waiter.awaitReturned(); 456 step4Waiter.awaitStarted(); 457 458 // Step 1 is not cancelled because it was done. 459 assertWithMessage("step1.statusFuture().isCancelled()") 460 .that(step1.statusFuture().isCancelled()) 461 .isFalse(); 462 // But its closeable is closed. 463 assertClosed(closeable1); 464 465 // Step 2 is cancelled because it wasn't complete. 466 assertWithMessage("step2.statusFuture().isCancelled()") 467 .that(step2.statusFuture().isCancelled()) 468 .isTrue(); 469 // Its closeable is closed. 470 assertClosed(closeable2); 471 472 // Step 3 was cancelled before it began 473 assertWithMessage("step3.statusFuture().isCancelled()") 474 .that(step3.statusFuture().isCancelled()) 475 .isTrue(); 476 // Its closeable is still open. 477 assertStillOpen(closeable3); 478 479 // Step 4 is not cancelled, because it caught the cancellation. 480 assertWithMessage("step4.statusFuture().isCancelled()") 481 .that(step4.statusFuture().isCancelled()) 482 .isFalse(); 483 // Its closeable isn't closed yet. 484 assertStillOpen(closeable4); 485 486 // Resume step 4 and complete. 487 step4Waiter.awaitReturned(); 488 assertThat(getFinalValue(step4)).isEqualTo("value 4"); 489 490 // Step 4's closeable is now closed. 491 assertClosed(closeable4); 492 // Step 3 still never ran, so its closeable should still be open. 493 assertStillOpen(closeable3); 494 } 495 testTransform()496 public void testTransform() throws Exception { 497 ClosingFuture<String> closingFuture = 498 ClosingFuture.from(immediateFuture("value")) 499 .transform( 500 new ClosingFunction<String, TestCloseable>() { 501 @Override 502 public TestCloseable apply(DeferredCloser closer, String v) throws Exception { 503 closer.eventuallyClose(closeable1, closingExecutor); 504 closer.eventuallyClose(closeable2, closingExecutor); 505 return closeable3; 506 } 507 }, 508 executor) 509 .transform( 510 new ClosingFunction<TestCloseable, String>() { 511 @Override 512 public String apply(DeferredCloser closer, TestCloseable v) throws Exception { 513 assertThat(v).isSameInstanceAs(closeable3); 514 assertStillOpen(closeable1, closeable2, closeable3); 515 return "value"; 516 } 517 }, 518 executor); 519 assertThat(getFinalValue(closingFuture)).isEqualTo("value"); 520 waitUntilClosed(closingFuture); 521 assertClosed(closeable1, closeable2); 522 assertStillOpen(closeable3); 523 } 524 testTransform_cancelledPipeline()525 public void testTransform_cancelledPipeline() throws Exception { 526 String value = "value"; 527 ClosingFuture<TestCloseable> closingFuture = 528 ClosingFuture.from(immediateFuture(value)) 529 .transform( 530 new ClosingFunction<String, TestCloseable>() { 531 @Override 532 public TestCloseable apply(DeferredCloser closer, String v) throws Exception { 533 return closer.eventuallyClose(closeable1, closingExecutor); 534 } 535 }, 536 executor) 537 .transform( 538 waiter.waitFor( 539 new ClosingFunction<TestCloseable, TestCloseable>() { 540 @Override 541 public TestCloseable apply(DeferredCloser closer, TestCloseable v) 542 throws Exception { 543 awaitUninterruptibly(futureCancelled); 544 closer.eventuallyClose(closeable2, closingExecutor); 545 closer.eventuallyClose(closeable3, closingExecutor); 546 return closeable4; 547 } 548 }), 549 executor); 550 waiter.awaitStarted(); 551 cancelFinalStepAndWait(closingFuture); 552 waiter.awaitReturned(); 553 assertClosed(closeable1, closeable2, closeable3); 554 assertStillOpen(closeable4); 555 } 556 testTransform_throws()557 public void testTransform_throws() throws Exception { 558 ClosingFuture<Object> closingFuture = 559 ClosingFuture.from(immediateFuture("value")) 560 .transform( 561 new ClosingFunction<String, Object>() { 562 @Override 563 public Object apply(DeferredCloser closer, String v) throws Exception { 564 closer.eventuallyClose(closeable1, closingExecutor); 565 closer.eventuallyClose(closeable2, closingExecutor); 566 throw exception; 567 } 568 }, 569 executor); 570 assertFinallyFailsWithException(closingFuture); 571 waitUntilClosed(closingFuture); 572 assertClosed(closeable1, closeable2); 573 } 574 testTransformAsync()575 public void testTransformAsync() throws Exception { 576 ClosingFuture<String> closingFuture = 577 ClosingFuture.from(immediateFuture("value")) 578 .transformAsync( 579 new AsyncClosingFunction<String, TestCloseable>() { 580 @Override 581 public ClosingFuture<TestCloseable> apply(DeferredCloser closer, String v) 582 throws Exception { 583 closer.eventuallyClose(closeable1, closingExecutor); 584 closer.eventuallyClose(closeable2, closingExecutor); 585 return ClosingFuture.eventuallyClosing( 586 immediateFuture(closeable3), closingExecutor); 587 } 588 }, 589 executor) 590 .transform( 591 new ClosingFunction<TestCloseable, String>() { 592 @Override 593 public String apply(DeferredCloser closer, TestCloseable v) throws Exception { 594 assertThat(v).isSameInstanceAs(closeable3); 595 assertStillOpen(closeable1, closeable2, closeable3); 596 return "value"; 597 } 598 }, 599 executor); 600 assertThat(getFinalValue(closingFuture)).isEqualTo("value"); 601 waitUntilClosed(closingFuture); 602 assertClosed(closeable1, closeable2, closeable3); 603 } 604 testTransformAsync_cancelledPipeline()605 public void testTransformAsync_cancelledPipeline() throws Exception { 606 ClosingFuture<TestCloseable> closingFuture = 607 ClosingFuture.from(immediateFuture("value")) 608 .transformAsync( 609 waiter.waitFor( 610 new AsyncClosingFunction<String, TestCloseable>() { 611 @Override 612 public ClosingFuture<TestCloseable> apply(DeferredCloser closer, String v) 613 throws Exception { 614 awaitUninterruptibly(futureCancelled); 615 closer.eventuallyClose(closeable1, closingExecutor); 616 closer.eventuallyClose(closeable2, closingExecutor); 617 return ClosingFuture.eventuallyClosing( 618 immediateFuture(closeable3), closingExecutor); 619 } 620 }), 621 executor); 622 waiter.awaitStarted(); 623 cancelFinalStepAndWait(closingFuture); 624 // not closed until the function returns 625 assertStillOpen(closeable1, closeable2, closeable3); 626 waiter.awaitReturned(); 627 assertClosed(closeable1, closeable2, closeable3); 628 } 629 testTransformAsync_throws()630 public void testTransformAsync_throws() throws Exception { 631 ClosingFuture<Object> closingFuture = 632 ClosingFuture.from(immediateFuture("value")) 633 .transformAsync( 634 new AsyncClosingFunction<String, Object>() { 635 @Override 636 public ClosingFuture<Object> apply(DeferredCloser closer, String v) 637 throws Exception { 638 closer.eventuallyClose(closeable1, closingExecutor); 639 closer.eventuallyClose(closeable2, closingExecutor); 640 throw exception; 641 } 642 }, 643 executor); 644 assertFinallyFailsWithException(closingFuture); 645 waitUntilClosed(closingFuture); 646 assertClosed(closeable1, closeable2); 647 } 648 testTransformAsync_failed()649 public void testTransformAsync_failed() throws Exception { 650 ClosingFuture<Object> closingFuture = 651 ClosingFuture.from(immediateFuture("value")) 652 .transformAsync( 653 new AsyncClosingFunction<String, Object>() { 654 @Override 655 public ClosingFuture<Object> apply(DeferredCloser closer, String v) 656 throws Exception { 657 closer.eventuallyClose(closeable1, closingExecutor); 658 closer.eventuallyClose(closeable2, closingExecutor); 659 return failedClosingFuture(); 660 } 661 }, 662 executor); 663 assertFinallyFailsWithException(closingFuture); 664 waitUntilClosed(closingFuture); 665 assertClosed(closeable1, closeable2); 666 } 667 testTransformAsync_withoutCloser()668 public void testTransformAsync_withoutCloser() throws Exception { 669 ClosingFuture<String> closingFuture = 670 ClosingFuture.submit( 671 new ClosingCallable<TestCloseable>() { 672 @Override 673 public TestCloseable call(DeferredCloser closer) throws Exception { 674 return closer.eventuallyClose(closeable1, closingExecutor); 675 } 676 }, 677 executor) 678 .transformAsync( 679 ClosingFuture.withoutCloser( 680 new AsyncFunction<TestCloseable, String>() { 681 @Override 682 public ListenableFuture<String> apply(TestCloseable v) throws Exception { 683 assertThat(v).isSameInstanceAs(closeable1); 684 assertStillOpen(closeable1); 685 return immediateFuture("value"); 686 } 687 }), 688 executor); 689 assertThat(getFinalValue(closingFuture)).isEqualTo("value"); 690 waitUntilClosed(closingFuture); 691 assertClosed(closeable1); 692 } 693 testWhenAllComplete_call()694 public void testWhenAllComplete_call() throws Exception { 695 final ClosingFuture<String> input1 = ClosingFuture.from(immediateFuture("value1")); 696 final ClosingFuture<Object> input2Failed = failedClosingFuture(); 697 final ClosingFuture<String> nonInput = ClosingFuture.from(immediateFuture("value3")); 698 final AtomicReference<ClosingFuture.Peeker> capturedPeeker = new AtomicReference<>(); 699 ClosingFuture<TestCloseable> closingFuture = 700 ClosingFuture.whenAllComplete(ImmutableList.of(input1, input2Failed)) 701 .call( 702 new CombiningCallable<TestCloseable>() { 703 @Override 704 public TestCloseable call(DeferredCloser closer, Peeker peeker) throws Exception { 705 closer.eventuallyClose(closeable1, closingExecutor); 706 assertThat(peeker.getDone(input1)).isSameInstanceAs("value1"); 707 try { 708 peeker.getDone(input2Failed); 709 fail("Peeker.getDone() should fail for failed inputs"); 710 } catch (ExecutionException expected) { 711 } 712 try { 713 peeker.getDone(nonInput); 714 fail("Peeker should not be able to peek into non-input ClosingFuture."); 715 } catch (IllegalArgumentException expected) { 716 } 717 capturedPeeker.set(peeker); 718 return closeable2; 719 } 720 }, 721 executor); 722 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); 723 waitUntilClosed(closingFuture); 724 assertStillOpen(closeable2); 725 assertClosed(closeable1); 726 assertThrows(IllegalStateException.class, () -> capturedPeeker.get().getDone(input1)); 727 } 728 testWhenAllComplete_call_cancelledPipeline()729 public void testWhenAllComplete_call_cancelledPipeline() throws Exception { 730 ClosingFuture<TestCloseable> closingFuture = 731 ClosingFuture.whenAllComplete( 732 ImmutableList.of( 733 ClosingFuture.from(immediateFuture(closeable1)), 734 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))) 735 .call( 736 waiter.waitFor( 737 new CombiningCallable<TestCloseable>() { 738 @Override 739 public TestCloseable call(DeferredCloser closer, Peeker peeker) 740 throws Exception { 741 awaitUninterruptibly(futureCancelled); 742 closer.eventuallyClose(closeable1, closingExecutor); 743 return closeable3; 744 } 745 }), 746 executor); 747 waiter.awaitStarted(); 748 cancelFinalStepAndWait(closingFuture); 749 waiter.awaitReturned(); 750 assertClosed(closeable1, closeable2); 751 assertStillOpen(closeable3); 752 } 753 testWhenAllComplete_call_throws()754 public void testWhenAllComplete_call_throws() throws Exception { 755 ClosingFuture<Object> closingFuture = 756 ClosingFuture.whenAllComplete( 757 ImmutableList.of( 758 ClosingFuture.from(immediateFuture(closeable1)), 759 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))) 760 .call( 761 new CombiningCallable<Object>() { 762 @Override 763 public Object call(DeferredCloser closer, Peeker peeker) throws Exception { 764 closer.eventuallyClose(closeable3, closingExecutor); 765 throw exception; 766 } 767 }, 768 executor); 769 assertFinallyFailsWithException(closingFuture); 770 waitUntilClosed(closingFuture); 771 assertStillOpen(closeable1); 772 assertClosed(closeable2, closeable3); 773 } 774 testWhenAllComplete_callAsync()775 public void testWhenAllComplete_callAsync() throws Exception { 776 final ClosingFuture<String> input1 = ClosingFuture.from(immediateFuture("value1")); 777 final ClosingFuture<Object> input2Failed = failedClosingFuture(); 778 final ClosingFuture<String> nonInput = ClosingFuture.from(immediateFuture("value3")); 779 final AtomicReference<ClosingFuture.Peeker> capturedPeeker = new AtomicReference<>(); 780 ClosingFuture<TestCloseable> closingFuture = 781 ClosingFuture.whenAllComplete(ImmutableList.of(input1, input2Failed)) 782 .callAsync( 783 new AsyncCombiningCallable<TestCloseable>() { 784 @Override 785 public ClosingFuture<TestCloseable> call(DeferredCloser closer, Peeker peeker) 786 throws Exception { 787 closer.eventuallyClose(closeable1, closingExecutor); 788 assertThat(peeker.getDone(input1)).isSameInstanceAs("value1"); 789 try { 790 peeker.getDone(input2Failed); 791 fail("Peeker should fail for failed inputs"); 792 } catch (ExecutionException expected) { 793 } 794 try { 795 peeker.getDone(nonInput); 796 fail("Peeker should not be able to peek into non-input ClosingFuture."); 797 } catch (IllegalArgumentException expected) { 798 } 799 capturedPeeker.set(peeker); 800 return ClosingFuture.eventuallyClosing( 801 immediateFuture(closeable2), closingExecutor); 802 } 803 }, 804 executor); 805 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); 806 waitUntilClosed(closingFuture); 807 assertClosed(closeable1, closeable2); 808 assertThrows(IllegalStateException.class, () -> capturedPeeker.get().getDone(input1)); 809 } 810 testWhenAllComplete_callAsync_cancelledPipeline()811 public void testWhenAllComplete_callAsync_cancelledPipeline() throws Exception { 812 ClosingFuture<TestCloseable> closingFuture = 813 ClosingFuture.whenAllComplete( 814 ImmutableList.of( 815 ClosingFuture.from(immediateFuture(closeable1)), 816 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))) 817 .callAsync( 818 waiter.waitFor( 819 new AsyncCombiningCallable<TestCloseable>() { 820 @Override 821 public ClosingFuture<TestCloseable> call(DeferredCloser closer, Peeker peeker) 822 throws Exception { 823 awaitUninterruptibly(futureCancelled); 824 closer.eventuallyClose(closeable1, closingExecutor); 825 return ClosingFuture.eventuallyClosing( 826 immediateFuture(closeable3), closingExecutor); 827 } 828 }), 829 executor); 830 waiter.awaitStarted(); 831 cancelFinalStepAndWait(closingFuture); 832 waiter.awaitReturned(); 833 assertClosed(closeable1, closeable2, closeable3); 834 } 835 testWhenAllComplete_callAsync_throws()836 public void testWhenAllComplete_callAsync_throws() throws Exception { 837 ClosingFuture<Object> closingFuture = 838 ClosingFuture.whenAllComplete( 839 ImmutableList.of( 840 ClosingFuture.from(immediateFuture(closeable1)), 841 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))) 842 .callAsync( 843 new AsyncCombiningCallable<Object>() { 844 @Override 845 public ClosingFuture<Object> call(DeferredCloser closer, Peeker peeker) 846 throws Exception { 847 closer.eventuallyClose(closeable3, closingExecutor); 848 throw exception; 849 } 850 }, 851 executor); 852 assertFinallyFailsWithException(closingFuture); 853 waitUntilClosed(closingFuture); 854 assertStillOpen(closeable1); 855 assertClosed(closeable2, closeable3); 856 } 857 858 // We don't need to test the happy case for SuccessfulCombiner.call(Async) because it's the same 859 // as Combiner. 860 testWhenAllSucceed_call_failedInput()861 public void testWhenAllSucceed_call_failedInput() throws Exception { 862 assertFinallyFailsWithException( 863 ClosingFuture.whenAllSucceed( 864 ImmutableList.of( 865 ClosingFuture.from(immediateFuture("value")), failedClosingFuture())) 866 .call( 867 new CombiningCallable<Object>() { 868 @Override 869 public Object call(DeferredCloser closer, Peeker peeker) throws Exception { 870 expect.fail(); 871 throw new AssertionError(); 872 } 873 }, 874 executor)); 875 } 876 testWhenAllSucceed_callAsync_failedInput()877 public void testWhenAllSucceed_callAsync_failedInput() throws Exception { 878 assertFinallyFailsWithException( 879 ClosingFuture.whenAllSucceed( 880 ImmutableList.of( 881 ClosingFuture.from(immediateFuture("value")), failedClosingFuture())) 882 .callAsync( 883 new AsyncCombiningCallable<Object>() { 884 @Override 885 public ClosingFuture<Object> call(DeferredCloser closer, Peeker peeker) 886 throws Exception { 887 expect.fail(); 888 throw new AssertionError(); 889 } 890 }, 891 executor)); 892 } 893 testWhenAllSucceed2_call()894 public void testWhenAllSucceed2_call() throws ExecutionException, IOException { 895 ClosingFuture<TestCloseable> closingFuture = 896 ClosingFuture.whenAllSucceed( 897 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 898 ClosingFuture.from(immediateFuture("value1"))) 899 .call( 900 new ClosingFunction2<TestCloseable, String, TestCloseable>() { 901 @Override 902 public TestCloseable apply(DeferredCloser closer, TestCloseable v1, String v2) 903 throws Exception { 904 assertThat(v1).isEqualTo(closeable1); 905 assertThat(v2).isEqualTo("value1"); 906 assertStillOpen(closeable1); 907 closer.eventuallyClose(closeable2, closingExecutor); 908 return closeable2; 909 } 910 }, 911 executor); 912 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); 913 waitUntilClosed(closingFuture); 914 assertClosed(closeable1, closeable2); 915 } 916 testWhenAllSucceed2_call_failedInput()917 public void testWhenAllSucceed2_call_failedInput() throws ExecutionException, IOException { 918 ClosingFuture<Object> closingFuture = 919 ClosingFuture.whenAllSucceed( 920 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 921 failedClosingFuture()) 922 .call( 923 new ClosingFunction2<TestCloseable, Object, Object>() { 924 @Override 925 public Object apply(DeferredCloser closer, TestCloseable v1, Object v2) 926 throws Exception { 927 expect.fail(); 928 throw new AssertionError(); 929 } 930 }, 931 executor); 932 assertFinallyFailsWithException(closingFuture); 933 waitUntilClosed(closingFuture); 934 assertClosed(closeable1); 935 } 936 testWhenAllSucceed2_call_cancelledPipeline()937 public void testWhenAllSucceed2_call_cancelledPipeline() throws Exception { 938 ClosingFuture<TestCloseable> closingFuture = 939 ClosingFuture.whenAllSucceed( 940 ClosingFuture.from(immediateFuture(closeable1)), 941 ClosingFuture.from(immediateFuture(closeable2))) 942 .call( 943 waiter.waitFor( 944 new ClosingFunction2<TestCloseable, TestCloseable, TestCloseable>() { 945 @Override 946 public TestCloseable apply( 947 DeferredCloser closer, TestCloseable v1, TestCloseable v2) 948 throws Exception { 949 awaitUninterruptibly(futureCancelled); 950 closer.eventuallyClose(closeable1, closingExecutor); 951 closer.eventuallyClose(closeable2, closingExecutor); 952 return closeable3; 953 } 954 }), 955 executor); 956 waiter.awaitStarted(); 957 cancelFinalStepAndWait(closingFuture); 958 // not closed until the function returns 959 assertStillOpen(closeable1, closeable2); 960 waiter.awaitReturned(); 961 assertClosed(closeable1, closeable2); 962 assertStillOpen(closeable3); 963 } 964 testWhenAllSucceed2_call_throws()965 public void testWhenAllSucceed2_call_throws() throws Exception { 966 ClosingFuture<Object> closingFuture = 967 ClosingFuture.whenAllSucceed( 968 ClosingFuture.from(immediateFuture(closeable1)), 969 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor)) 970 .call( 971 new ClosingFunction2<TestCloseable, TestCloseable, Object>() { 972 @Override 973 public Object apply(DeferredCloser closer, TestCloseable v1, TestCloseable v2) 974 throws Exception { 975 closer.eventuallyClose(closeable3, closingExecutor); 976 throw exception; 977 } 978 }, 979 executor); 980 assertFinallyFailsWithException(closingFuture); 981 waitUntilClosed(closingFuture); 982 assertStillOpen(closeable1); 983 assertClosed(closeable2, closeable3); 984 } 985 testWhenAllSucceed2_callAsync()986 public void testWhenAllSucceed2_callAsync() throws ExecutionException, IOException { 987 ClosingFuture<TestCloseable> closingFuture = 988 ClosingFuture.whenAllSucceed( 989 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 990 ClosingFuture.from(immediateFuture("value1"))) 991 .callAsync( 992 new AsyncClosingFunction2<TestCloseable, String, TestCloseable>() { 993 @Override 994 public ClosingFuture<TestCloseable> apply( 995 DeferredCloser closer, TestCloseable v1, String v2) throws Exception { 996 assertThat(v1).isEqualTo(closeable1); 997 assertThat(v2).isEqualTo("value1"); 998 assertStillOpen(closeable1); 999 closer.eventuallyClose(closeable2, closingExecutor); 1000 return ClosingFuture.eventuallyClosing( 1001 immediateFuture(closeable3), closingExecutor); 1002 } 1003 }, 1004 executor); 1005 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable3); 1006 waitUntilClosed(closingFuture); 1007 assertClosed(closeable1, closeable2, closeable3); 1008 } 1009 testWhenAllSucceed2_callAsync_failedInput()1010 public void testWhenAllSucceed2_callAsync_failedInput() throws ExecutionException, IOException { 1011 ClosingFuture<Object> closingFuture = 1012 ClosingFuture.whenAllSucceed( 1013 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 1014 failedClosingFuture()) 1015 .callAsync( 1016 new AsyncClosingFunction2<TestCloseable, Object, Object>() { 1017 @Override 1018 public ClosingFuture<Object> apply( 1019 DeferredCloser closer, TestCloseable v1, Object v2) throws Exception { 1020 expect.fail(); 1021 throw new AssertionError(); 1022 } 1023 }, 1024 executor); 1025 assertFinallyFailsWithException(closingFuture); 1026 waitUntilClosed(closingFuture); 1027 assertClosed(closeable1); 1028 } 1029 testWhenAllSucceed2_callAsync_cancelledPipeline()1030 public void testWhenAllSucceed2_callAsync_cancelledPipeline() throws Exception { 1031 ClosingFuture<TestCloseable> closingFuture = 1032 ClosingFuture.whenAllSucceed( 1033 ClosingFuture.from(immediateFuture(closeable1)), 1034 ClosingFuture.from(immediateFuture(closeable2))) 1035 .callAsync( 1036 waiter.waitFor( 1037 new AsyncClosingFunction2<TestCloseable, TestCloseable, TestCloseable>() { 1038 @Override 1039 public ClosingFuture<TestCloseable> apply( 1040 DeferredCloser closer, TestCloseable v1, TestCloseable v2) 1041 throws Exception { 1042 awaitUninterruptibly(futureCancelled); 1043 closer.eventuallyClose(closeable1, closingExecutor); 1044 closer.eventuallyClose(closeable2, closingExecutor); 1045 return ClosingFuture.eventuallyClosing( 1046 immediateFuture(closeable3), closingExecutor); 1047 } 1048 }), 1049 executor); 1050 waiter.awaitStarted(); 1051 cancelFinalStepAndWait(closingFuture); 1052 // not closed until the function returns 1053 assertStillOpen(closeable1, closeable2, closeable3); 1054 waiter.awaitReturned(); 1055 assertClosed(closeable1, closeable2, closeable3); 1056 } 1057 testWhenAllSucceed2_callAsync_throws()1058 public void testWhenAllSucceed2_callAsync_throws() throws Exception { 1059 ClosingFuture<Object> closingFuture = 1060 ClosingFuture.whenAllSucceed( 1061 ClosingFuture.from(immediateFuture(closeable1)), 1062 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor)) 1063 .callAsync( 1064 new AsyncClosingFunction2<TestCloseable, TestCloseable, Object>() { 1065 @Override 1066 public ClosingFuture<Object> apply( 1067 DeferredCloser closer, TestCloseable v1, TestCloseable v2) throws Exception { 1068 closer.eventuallyClose(closeable3, closingExecutor); 1069 throw exception; 1070 } 1071 }, 1072 executor); 1073 assertFinallyFailsWithException(closingFuture); 1074 waitUntilClosed(closingFuture); 1075 assertStillOpen(closeable1); 1076 assertClosed(closeable2, closeable3); 1077 } 1078 testWhenAllSucceed3_call()1079 public void testWhenAllSucceed3_call() throws ExecutionException, IOException { 1080 ClosingFuture<TestCloseable> closingFuture = 1081 ClosingFuture.whenAllSucceed( 1082 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 1083 ClosingFuture.from(immediateFuture("value2")), 1084 ClosingFuture.from(immediateFuture("value3"))) 1085 .call( 1086 new ClosingFunction3<TestCloseable, String, String, TestCloseable>() { 1087 @Override 1088 public TestCloseable apply( 1089 DeferredCloser closer, TestCloseable v1, String v2, String v3) 1090 throws Exception { 1091 assertThat(v1).isEqualTo(closeable1); 1092 assertThat(v2).isEqualTo("value2"); 1093 assertThat(v3).isEqualTo("value3"); 1094 assertStillOpen(closeable1); 1095 closer.eventuallyClose(closeable2, closingExecutor); 1096 return closeable2; 1097 } 1098 }, 1099 executor); 1100 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); 1101 waitUntilClosed(closingFuture); 1102 assertClosed(closeable1, closeable2); 1103 } 1104 testWhenAllSucceed3_call_failedInput()1105 public void testWhenAllSucceed3_call_failedInput() throws ExecutionException, IOException { 1106 ClosingFuture<Object> closingFuture = 1107 ClosingFuture.whenAllSucceed( 1108 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 1109 failedClosingFuture(), 1110 ClosingFuture.from(immediateFuture("value3"))) 1111 .call( 1112 new ClosingFunction3<TestCloseable, Object, String, Object>() { 1113 @Override 1114 public Object apply(DeferredCloser closer, TestCloseable v1, Object v2, String v3) 1115 throws Exception { 1116 expect.fail(); 1117 throw new AssertionError(); 1118 } 1119 }, 1120 executor); 1121 assertFinallyFailsWithException(closingFuture); 1122 waitUntilClosed(closingFuture); 1123 assertClosed(closeable1); 1124 } 1125 testWhenAllSucceed3_call_cancelledPipeline()1126 public void testWhenAllSucceed3_call_cancelledPipeline() throws Exception { 1127 ClosingFuture<TestCloseable> closingFuture = 1128 ClosingFuture.whenAllSucceed( 1129 ClosingFuture.from(immediateFuture(closeable1)), 1130 ClosingFuture.from(immediateFuture(closeable2)), 1131 ClosingFuture.from(immediateFuture("value3"))) 1132 .call( 1133 waiter.waitFor( 1134 new ClosingFunction3<TestCloseable, TestCloseable, String, TestCloseable>() { 1135 @Override 1136 public TestCloseable apply( 1137 DeferredCloser closer, TestCloseable v1, TestCloseable v2, String v3) 1138 throws Exception { 1139 awaitUninterruptibly(futureCancelled); 1140 closer.eventuallyClose(closeable1, closingExecutor); 1141 closer.eventuallyClose(closeable2, closingExecutor); 1142 return closeable3; 1143 } 1144 }), 1145 executor); 1146 waiter.awaitStarted(); 1147 cancelFinalStepAndWait(closingFuture); 1148 // not closed until the function returns 1149 assertStillOpen(closeable1, closeable2); 1150 waiter.awaitReturned(); 1151 assertClosed(closeable1, closeable2); 1152 assertStillOpen(closeable3); 1153 } 1154 testWhenAllSucceed3_call_throws()1155 public void testWhenAllSucceed3_call_throws() throws Exception { 1156 ClosingFuture<Object> closingFuture = 1157 ClosingFuture.whenAllSucceed( 1158 ClosingFuture.from(immediateFuture(closeable1)), 1159 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor), 1160 ClosingFuture.from(immediateFuture("value3"))) 1161 .call( 1162 new ClosingFunction3<TestCloseable, TestCloseable, String, Object>() { 1163 @Override 1164 public Object apply( 1165 DeferredCloser closer, TestCloseable v1, TestCloseable v2, String v3) 1166 throws Exception { 1167 closer.eventuallyClose(closeable3, closingExecutor); 1168 throw exception; 1169 } 1170 }, 1171 executor); 1172 assertFinallyFailsWithException(closingFuture); 1173 waitUntilClosed(closingFuture); 1174 assertStillOpen(closeable1); 1175 assertClosed(closeable2, closeable3); 1176 } 1177 testWhenAllSucceed4_call()1178 public void testWhenAllSucceed4_call() throws ExecutionException, IOException { 1179 ClosingFuture<TestCloseable> closingFuture = 1180 ClosingFuture.whenAllSucceed( 1181 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 1182 ClosingFuture.from(immediateFuture("value2")), 1183 ClosingFuture.from(immediateFuture("value3")), 1184 ClosingFuture.from(immediateFuture("value4"))) 1185 .call( 1186 new ClosingFunction4<TestCloseable, String, String, String, TestCloseable>() { 1187 @Override 1188 public TestCloseable apply( 1189 DeferredCloser closer, TestCloseable v1, String v2, String v3, String v4) 1190 throws Exception { 1191 assertThat(v1).isEqualTo(closeable1); 1192 assertThat(v2).isEqualTo("value2"); 1193 assertThat(v3).isEqualTo("value3"); 1194 assertThat(v4).isEqualTo("value4"); 1195 assertStillOpen(closeable1); 1196 closer.eventuallyClose(closeable2, closingExecutor); 1197 return closeable2; 1198 } 1199 }, 1200 executor); 1201 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); 1202 waitUntilClosed(closingFuture); 1203 assertClosed(closeable1, closeable2); 1204 } 1205 testWhenAllSucceed4_call_failedInput()1206 public void testWhenAllSucceed4_call_failedInput() throws ExecutionException, IOException { 1207 ClosingFuture<Object> closingFuture = 1208 ClosingFuture.whenAllSucceed( 1209 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 1210 failedClosingFuture(), 1211 ClosingFuture.from(immediateFuture("value3")), 1212 ClosingFuture.from(immediateFuture("value4"))) 1213 .call( 1214 new ClosingFunction4<TestCloseable, Object, String, String, Object>() { 1215 @Override 1216 public Object apply( 1217 DeferredCloser closer, TestCloseable v1, Object v2, String v3, String v4) 1218 throws Exception { 1219 expect.fail(); 1220 throw new AssertionError(); 1221 } 1222 }, 1223 executor); 1224 assertFinallyFailsWithException(closingFuture); 1225 waitUntilClosed(closingFuture); 1226 assertClosed(closeable1); 1227 } 1228 testWhenAllSucceed4_call_cancelledPipeline()1229 public void testWhenAllSucceed4_call_cancelledPipeline() throws Exception { 1230 ClosingFuture<TestCloseable> closingFuture = 1231 ClosingFuture.whenAllSucceed( 1232 ClosingFuture.from(immediateFuture(closeable1)), 1233 ClosingFuture.from(immediateFuture(closeable2)), 1234 ClosingFuture.from(immediateFuture("value3")), 1235 ClosingFuture.from(immediateFuture("value4"))) 1236 .call( 1237 waiter.waitFor( 1238 new ClosingFunction4< 1239 TestCloseable, TestCloseable, String, String, TestCloseable>() { 1240 @Override 1241 public TestCloseable apply( 1242 DeferredCloser closer, 1243 TestCloseable v1, 1244 TestCloseable v2, 1245 String v3, 1246 String v4) 1247 throws Exception { 1248 awaitUninterruptibly(futureCancelled); 1249 closer.eventuallyClose(closeable1, closingExecutor); 1250 closer.eventuallyClose(closeable2, closingExecutor); 1251 return closeable3; 1252 } 1253 }), 1254 executor); 1255 waiter.awaitStarted(); 1256 cancelFinalStepAndWait(closingFuture); 1257 // not closed until the function returns 1258 assertStillOpen(closeable1, closeable2); 1259 waiter.awaitReturned(); 1260 assertClosed(closeable1, closeable2); 1261 assertStillOpen(closeable3); 1262 } 1263 testWhenAllSucceed4_call_throws()1264 public void testWhenAllSucceed4_call_throws() throws Exception { 1265 ClosingFuture<Object> closingFuture = 1266 ClosingFuture.whenAllSucceed( 1267 ClosingFuture.from(immediateFuture(closeable1)), 1268 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor), 1269 ClosingFuture.from(immediateFuture("value3")), 1270 ClosingFuture.from(immediateFuture("value4"))) 1271 .call( 1272 new ClosingFunction4<TestCloseable, TestCloseable, String, String, Object>() { 1273 @Override 1274 public Object apply( 1275 DeferredCloser closer, 1276 TestCloseable v1, 1277 TestCloseable v2, 1278 String v3, 1279 String v4) 1280 throws Exception { 1281 closer.eventuallyClose(closeable3, closingExecutor); 1282 throw exception; 1283 } 1284 }, 1285 executor); 1286 assertFinallyFailsWithException(closingFuture); 1287 waitUntilClosed(closingFuture); 1288 assertStillOpen(closeable1); 1289 assertClosed(closeable2, closeable3); 1290 } 1291 testWhenAllSucceed5_call()1292 public void testWhenAllSucceed5_call() throws ExecutionException, IOException { 1293 ClosingFuture<TestCloseable> closingFuture = 1294 ClosingFuture.whenAllSucceed( 1295 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 1296 ClosingFuture.from(immediateFuture("value2")), 1297 ClosingFuture.from(immediateFuture("value3")), 1298 ClosingFuture.from(immediateFuture("value4")), 1299 ClosingFuture.from(immediateFuture("value5"))) 1300 .call( 1301 new ClosingFunction5< 1302 TestCloseable, String, String, String, String, TestCloseable>() { 1303 @Override 1304 public TestCloseable apply( 1305 DeferredCloser closer, 1306 TestCloseable v1, 1307 String v2, 1308 String v3, 1309 String v4, 1310 String v5) 1311 throws Exception { 1312 assertThat(v1).isEqualTo(closeable1); 1313 assertThat(v2).isEqualTo("value2"); 1314 assertThat(v3).isEqualTo("value3"); 1315 assertThat(v4).isEqualTo("value4"); 1316 assertThat(v5).isEqualTo("value5"); 1317 assertStillOpen(closeable1); 1318 closer.eventuallyClose(closeable2, closingExecutor); 1319 return closeable2; 1320 } 1321 }, 1322 executor); 1323 assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); 1324 waitUntilClosed(closingFuture); 1325 assertClosed(closeable1, closeable2); 1326 } 1327 testWhenAllSucceed5_call_failedInput()1328 public void testWhenAllSucceed5_call_failedInput() throws ExecutionException, IOException { 1329 ClosingFuture<Object> closingFuture = 1330 ClosingFuture.whenAllSucceed( 1331 ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), 1332 failedClosingFuture(), 1333 ClosingFuture.from(immediateFuture("value3")), 1334 ClosingFuture.from(immediateFuture("value4")), 1335 ClosingFuture.from(immediateFuture("value5"))) 1336 .call( 1337 new ClosingFunction5<TestCloseable, Object, String, String, String, Object>() { 1338 @Override 1339 public Object apply( 1340 DeferredCloser closer, 1341 TestCloseable v1, 1342 Object v2, 1343 String v3, 1344 String v4, 1345 String v5) 1346 throws Exception { 1347 expect.fail(); 1348 throw new AssertionError(); 1349 } 1350 }, 1351 executor); 1352 assertFinallyFailsWithException(closingFuture); 1353 waitUntilClosed(closingFuture); 1354 assertClosed(closeable1); 1355 } 1356 testWhenAllSucceed5_call_cancelledPipeline()1357 public void testWhenAllSucceed5_call_cancelledPipeline() throws Exception { 1358 ClosingFuture<TestCloseable> closingFuture = 1359 ClosingFuture.whenAllSucceed( 1360 ClosingFuture.from(immediateFuture(closeable1)), 1361 ClosingFuture.from(immediateFuture(closeable2)), 1362 ClosingFuture.from(immediateFuture("value3")), 1363 ClosingFuture.from(immediateFuture("value4")), 1364 ClosingFuture.from(immediateFuture("value5"))) 1365 .call( 1366 waiter.waitFor( 1367 new ClosingFunction5< 1368 TestCloseable, TestCloseable, String, String, String, TestCloseable>() { 1369 @Override 1370 public TestCloseable apply( 1371 DeferredCloser closer, 1372 TestCloseable v1, 1373 TestCloseable v2, 1374 String v3, 1375 String v4, 1376 String v5) 1377 throws Exception { 1378 awaitUninterruptibly(futureCancelled); 1379 closer.eventuallyClose(closeable1, closingExecutor); 1380 closer.eventuallyClose(closeable2, closingExecutor); 1381 return closeable3; 1382 } 1383 }), 1384 executor); 1385 waiter.awaitStarted(); 1386 cancelFinalStepAndWait(closingFuture); 1387 // not closed until the function returns 1388 assertStillOpen(closeable1, closeable2); 1389 waiter.awaitReturned(); 1390 assertClosed(closeable1, closeable2); 1391 assertStillOpen(closeable3); 1392 } 1393 testWhenAllSucceed5_call_throws()1394 public void testWhenAllSucceed5_call_throws() throws Exception { 1395 ClosingFuture<Object> closingFuture = 1396 ClosingFuture.whenAllSucceed( 1397 ClosingFuture.from(immediateFuture(closeable1)), 1398 ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor), 1399 ClosingFuture.from(immediateFuture("value3")), 1400 ClosingFuture.from(immediateFuture("value4")), 1401 ClosingFuture.from(immediateFuture("value5"))) 1402 .call( 1403 new ClosingFunction5< 1404 TestCloseable, TestCloseable, String, String, String, Object>() { 1405 @Override 1406 public Object apply( 1407 DeferredCloser closer, 1408 TestCloseable v1, 1409 TestCloseable v2, 1410 String v3, 1411 String v4, 1412 String v5) 1413 throws Exception { 1414 closer.eventuallyClose(closeable3, closingExecutor); 1415 throw exception; 1416 } 1417 }, 1418 executor); 1419 assertFinallyFailsWithException(closingFuture); 1420 waitUntilClosed(closingFuture); 1421 assertStillOpen(closeable1); 1422 assertClosed(closeable2, closeable3); 1423 } 1424 testTransform_preventsFurtherOperations()1425 public void testTransform_preventsFurtherOperations() { 1426 ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); 1427 ClosingFuture<String> unused = 1428 closingFuture.transform( 1429 new ClosingFunction<String, String>() { 1430 @Override 1431 public String apply(DeferredCloser closer, String v) throws Exception { 1432 return "value2"; 1433 } 1434 }, 1435 executor); 1436 assertDerivingThrowsIllegalStateException(closingFuture); 1437 assertFinalStepThrowsIllegalStateException(closingFuture); 1438 } 1439 testTransformAsync_preventsFurtherOperations()1440 public void testTransformAsync_preventsFurtherOperations() { 1441 ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); 1442 ClosingFuture<String> unused = 1443 closingFuture.transformAsync( 1444 new AsyncClosingFunction<String, String>() { 1445 @Override 1446 public ClosingFuture<String> apply(DeferredCloser closer, String v) throws Exception { 1447 return ClosingFuture.from(immediateFuture("value2")); 1448 } 1449 }, 1450 executor); 1451 assertDerivingThrowsIllegalStateException(closingFuture); 1452 assertFinalStepThrowsIllegalStateException(closingFuture); 1453 } 1454 testCatching_preventsFurtherOperations()1455 public void testCatching_preventsFurtherOperations() { 1456 ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); 1457 ClosingFuture<String> unused = 1458 closingFuture.catching( 1459 Exception.class, 1460 new ClosingFunction<Exception, String>() { 1461 @Override 1462 public String apply(DeferredCloser closer, Exception x) throws Exception { 1463 return "value2"; 1464 } 1465 }, 1466 executor); 1467 assertDerivingThrowsIllegalStateException(closingFuture); 1468 assertFinalStepThrowsIllegalStateException(closingFuture); 1469 } 1470 testCatchingAsync_preventsFurtherOperations()1471 public void testCatchingAsync_preventsFurtherOperations() { 1472 ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); 1473 ClosingFuture<String> unused = 1474 closingFuture.catchingAsync( 1475 Exception.class, 1476 ClosingFuture.withoutCloser( 1477 new AsyncFunction<Exception, String>() { 1478 @Override 1479 public ListenableFuture<String> apply(Exception x) throws Exception { 1480 return immediateFuture("value2"); 1481 } 1482 }), 1483 executor); 1484 assertDerivingThrowsIllegalStateException(closingFuture); 1485 assertFinalStepThrowsIllegalStateException(closingFuture); 1486 } 1487 testWhenAllComplete_preventsFurtherOperations()1488 public void testWhenAllComplete_preventsFurtherOperations() { 1489 ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); 1490 Combiner unused = ClosingFuture.whenAllComplete(asList(closingFuture)); 1491 assertDerivingThrowsIllegalStateException(closingFuture); 1492 assertFinalStepThrowsIllegalStateException(closingFuture); 1493 } 1494 testWhenAllSucceed_preventsFurtherOperations()1495 public void testWhenAllSucceed_preventsFurtherOperations() { 1496 ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); 1497 Combiner unused = ClosingFuture.whenAllSucceed(asList(closingFuture)); 1498 assertDerivingThrowsIllegalStateException(closingFuture); 1499 assertFinalStepThrowsIllegalStateException(closingFuture); 1500 } 1501 assertDerivingThrowsIllegalStateException( ClosingFuture<String> closingFuture)1502 protected final void assertDerivingThrowsIllegalStateException( 1503 ClosingFuture<String> closingFuture) { 1504 try { 1505 closingFuture.transform( 1506 new ClosingFunction<String, String>() { 1507 @Override 1508 public String apply(DeferredCloser closer3, String v1) throws Exception { 1509 return "value3"; 1510 } 1511 }, 1512 executor); 1513 fail(); 1514 } catch (IllegalStateException expected5) { 1515 } 1516 try { 1517 closingFuture.transformAsync( 1518 new AsyncClosingFunction<String, String>() { 1519 @Override 1520 public ClosingFuture<String> apply(DeferredCloser closer2, String v) throws Exception { 1521 return ClosingFuture.from(immediateFuture("value3")); 1522 } 1523 }, 1524 executor); 1525 fail(); 1526 } catch (IllegalStateException expected4) { 1527 } 1528 try { 1529 closingFuture.catching( 1530 Exception.class, 1531 new ClosingFunction<Exception, String>() { 1532 @Override 1533 public String apply(DeferredCloser closer1, Exception x1) throws Exception { 1534 return "value3"; 1535 } 1536 }, 1537 executor); 1538 fail(); 1539 } catch (IllegalStateException expected3) { 1540 } 1541 try { 1542 closingFuture.catchingAsync( 1543 Exception.class, 1544 new AsyncClosingFunction<Exception, String>() { 1545 @Override 1546 public ClosingFuture<String> apply(DeferredCloser closer, Exception x) 1547 throws Exception { 1548 return ClosingFuture.from(immediateFuture("value3")); 1549 } 1550 }, 1551 executor); 1552 fail(); 1553 } catch (IllegalStateException expected2) { 1554 } 1555 try { 1556 ClosingFuture.whenAllComplete(asList(closingFuture)); 1557 fail(); 1558 } catch (IllegalStateException expected1) { 1559 } 1560 try { 1561 ClosingFuture.whenAllSucceed(asList(closingFuture)); 1562 fail(); 1563 } catch (IllegalStateException expected) { 1564 } 1565 } 1566 1567 /** Asserts that marking this step a final step throws {@link IllegalStateException}. */ assertFinalStepThrowsIllegalStateException(ClosingFuture<?> closingFuture)1568 protected void assertFinalStepThrowsIllegalStateException(ClosingFuture<?> closingFuture) { 1569 try { 1570 closingFuture.finishToFuture(); 1571 fail(); 1572 } catch (IllegalStateException expected) { 1573 } 1574 try { 1575 closingFuture.finishToValueAndCloser(new NoOpValueAndCloserConsumer<>(), executor); 1576 fail(); 1577 } catch (IllegalStateException expected) { 1578 } 1579 } 1580 1581 // Avoid infinite recursion if a closeable's close() method throws RejectedExecutionException and 1582 // is closed using the direct executor. testCloseThrowsRejectedExecutionException()1583 public void testCloseThrowsRejectedExecutionException() throws Exception { 1584 doThrow(new RejectedExecutionException()).when(mockCloseable).close(); 1585 ClosingFuture<Closeable> closingFuture = 1586 ClosingFuture.submit( 1587 new ClosingCallable<Closeable>() { 1588 @Override 1589 public Closeable call(DeferredCloser closer) throws Exception { 1590 return closer.eventuallyClose(mockCloseable, directExecutor()); 1591 } 1592 }, 1593 executor); 1594 assertThat(getFinalValue(closingFuture)).isEqualTo(mockCloseable); 1595 waitUntilClosed(closingFuture); 1596 verify(mockCloseable, timeout(1000)).close(); 1597 } 1598 1599 /** 1600 * Marks the given step final, waits for it to be finished, and returns the value. 1601 * 1602 * @throws ExecutionException if the step failed 1603 * @throws CancellationException if the step was cancelled 1604 */ getFinalValue(ClosingFuture<T> closingFuture)1605 abstract <T> T getFinalValue(ClosingFuture<T> closingFuture) throws ExecutionException; 1606 1607 /** Marks the given step final, cancels it, and waits for the cancellation to happen. */ cancelFinalStepAndWait(ClosingFuture<TestCloseable> closingFuture)1608 abstract void cancelFinalStepAndWait(ClosingFuture<TestCloseable> closingFuture); 1609 1610 /** 1611 * Marks the given step final and waits for it to fail. Expects the failure exception to match 1612 * {@link AbstractClosingFutureTest#exception}. 1613 */ assertFinallyFailsWithException(ClosingFuture<?> closingFuture)1614 abstract void assertFinallyFailsWithException(ClosingFuture<?> closingFuture); 1615 1616 /** Waits for the given step to be canceled. */ assertBecomesCanceled(ClosingFuture<?> closingFuture)1617 abstract void assertBecomesCanceled(ClosingFuture<?> closingFuture) throws ExecutionException; 1618 1619 /** Waits for the given step's closeables to be closed. */ waitUntilClosed(ClosingFuture<?> closingFuture)1620 void waitUntilClosed(ClosingFuture<?> closingFuture) { 1621 assertTrue(awaitUninterruptibly(closingFuture.whenClosedCountDown(), 1, SECONDS)); 1622 } 1623 assertThatFutureFailsWithException(Future<?> future)1624 void assertThatFutureFailsWithException(Future<?> future) { 1625 try { 1626 getUninterruptibly(future); 1627 fail("Expected future to fail: " + future); 1628 } catch (ExecutionException e) { 1629 assertThat(e).hasCauseThat().isSameInstanceAs(exception); 1630 } 1631 } 1632 assertThatFutureBecomesCancelled(Future<?> future)1633 static void assertThatFutureBecomesCancelled(Future<?> future) throws ExecutionException { 1634 try { 1635 getUninterruptibly(future); 1636 fail("Expected future to be canceled: " + future); 1637 } catch (CancellationException expected) { 1638 } 1639 } 1640 assertStillOpen(TestCloseable closeable1, TestCloseable... moreCloseables)1641 private static void assertStillOpen(TestCloseable closeable1, TestCloseable... moreCloseables) 1642 throws IOException { 1643 for (TestCloseable closeable : asList(closeable1, moreCloseables)) { 1644 assertWithMessage("%s.stillOpen()", closeable).that(closeable.stillOpen()).isTrue(); 1645 } 1646 } 1647 assertClosed(TestCloseable closeable1, TestCloseable... moreCloseables)1648 static void assertClosed(TestCloseable closeable1, TestCloseable... moreCloseables) 1649 throws IOException { 1650 for (TestCloseable closeable : asList(closeable1, moreCloseables)) { 1651 assertWithMessage("%s.isClosed()", closeable).that(closeable.awaitClosed()).isTrue(); 1652 } 1653 } 1654 failedClosingFuture()1655 private ClosingFuture<Object> failedClosingFuture() { 1656 return ClosingFuture.from(immediateFailedFuture(exception)); 1657 } 1658 assertNoExpectedFailures()1659 private void assertNoExpectedFailures() { 1660 assertWithMessage("executor was shut down") 1661 .that(shutdownAndAwaitTermination(executor, 10, SECONDS)) 1662 .isTrue(); 1663 assertWithMessage("closingExecutor was shut down") 1664 .that(shutdownAndAwaitTermination(closingExecutor, 10, SECONDS)) 1665 .isTrue(); 1666 if (!failures.isEmpty()) { 1667 StringWriter message = new StringWriter(); 1668 PrintWriter writer = new PrintWriter(message); 1669 writer.println("Expected no failures, but found:"); 1670 for (AssertionError failure : failures) { 1671 failure.printStackTrace(writer); 1672 } 1673 failures.clear(); 1674 assertWithMessage(message.toString()).fail(); 1675 } 1676 } 1677 1678 static final class TestCloseable implements Closeable { 1679 private final CountDownLatch latch = new CountDownLatch(1); 1680 private final String name; 1681 TestCloseable(String name)1682 TestCloseable(String name) { 1683 this.name = name; 1684 } 1685 1686 @Override close()1687 public void close() throws IOException { 1688 latch.countDown(); 1689 } 1690 awaitClosed()1691 boolean awaitClosed() { 1692 return awaitUninterruptibly(latch, 10, SECONDS); 1693 } 1694 stillOpen()1695 boolean stillOpen() { 1696 return !awaitUninterruptibly(latch, 1, SECONDS); 1697 } 1698 1699 @Override toString()1700 public String toString() { 1701 return name; 1702 } 1703 } 1704 1705 static final class Waiter { 1706 private final CountDownLatch started = new CountDownLatch(1); 1707 private final CountDownLatch canReturn = new CountDownLatch(1); 1708 private final CountDownLatch returned = new CountDownLatch(1); 1709 private Object proxy; 1710 waitFor(Callable<V> callable)1711 <V> Callable<V> waitFor(Callable<V> callable) { 1712 return waitFor(callable, Callable.class); 1713 } 1714 waitFor(ClosingCallable<V> closingCallable)1715 <V> ClosingCallable<V> waitFor(ClosingCallable<V> closingCallable) { 1716 return waitFor(closingCallable, ClosingCallable.class); 1717 } 1718 waitFor(AsyncClosingCallable<V> asyncClosingCallable)1719 <V> AsyncClosingCallable<V> waitFor(AsyncClosingCallable<V> asyncClosingCallable) { 1720 return waitFor(asyncClosingCallable, AsyncClosingCallable.class); 1721 } 1722 waitFor(ClosingFunction<T, U> closingFunction)1723 <T, U> ClosingFunction<T, U> waitFor(ClosingFunction<T, U> closingFunction) { 1724 return waitFor(closingFunction, ClosingFunction.class); 1725 } 1726 waitFor(AsyncClosingFunction<T, U> asyncClosingFunction)1727 <T, U> AsyncClosingFunction<T, U> waitFor(AsyncClosingFunction<T, U> asyncClosingFunction) { 1728 return waitFor(asyncClosingFunction, AsyncClosingFunction.class); 1729 } 1730 waitFor(CombiningCallable<V> combiningCallable)1731 <V> CombiningCallable<V> waitFor(CombiningCallable<V> combiningCallable) { 1732 return waitFor(combiningCallable, CombiningCallable.class); 1733 } 1734 waitFor(AsyncCombiningCallable<V> asyncCombiningCallable)1735 <V> AsyncCombiningCallable<V> waitFor(AsyncCombiningCallable<V> asyncCombiningCallable) { 1736 return waitFor(asyncCombiningCallable, AsyncCombiningCallable.class); 1737 } 1738 waitFor(ClosingFunction2<V1, V2, U> closingFunction2)1739 <V1, V2, U> ClosingFunction2<V1, V2, U> waitFor(ClosingFunction2<V1, V2, U> closingFunction2) { 1740 return waitFor(closingFunction2, ClosingFunction2.class); 1741 } 1742 waitFor( AsyncClosingFunction2<V1, V2, U> asyncClosingFunction2)1743 <V1, V2, U> AsyncClosingFunction2<V1, V2, U> waitFor( 1744 AsyncClosingFunction2<V1, V2, U> asyncClosingFunction2) { 1745 return waitFor(asyncClosingFunction2, AsyncClosingFunction2.class); 1746 } 1747 waitFor( ClosingFunction3<V1, V2, V3, U> closingFunction3)1748 <V1, V2, V3, U> ClosingFunction3<V1, V2, V3, U> waitFor( 1749 ClosingFunction3<V1, V2, V3, U> closingFunction3) { 1750 return waitFor(closingFunction3, ClosingFunction3.class); 1751 } 1752 waitFor( ClosingFunction4<V1, V2, V3, V4, U> closingFunction4)1753 <V1, V2, V3, V4, U> ClosingFunction4<V1, V2, V3, V4, U> waitFor( 1754 ClosingFunction4<V1, V2, V3, V4, U> closingFunction4) { 1755 return waitFor(closingFunction4, ClosingFunction4.class); 1756 } 1757 waitFor( ClosingFunction5<V1, V2, V3, V4, V5, U> closingFunction5)1758 <V1, V2, V3, V4, V5, U> ClosingFunction5<V1, V2, V3, V4, V5, U> waitFor( 1759 ClosingFunction5<V1, V2, V3, V4, V5, U> closingFunction5) { 1760 return waitFor(closingFunction5, ClosingFunction5.class); 1761 } 1762 waitFor(final T delegate, final Class<T> type)1763 <T> T waitFor(final T delegate, final Class<T> type) { 1764 checkState(proxy == null); 1765 T proxyObject = 1766 Reflection.newProxy( 1767 type, 1768 new InvocationHandler() { 1769 @Override 1770 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 1771 if (!method.getDeclaringClass().equals(type)) { 1772 return method.invoke(delegate, args); 1773 } 1774 checkState(started.getCount() == 1); 1775 started.countDown(); 1776 try { 1777 return method.invoke(delegate, args); 1778 } catch (InvocationTargetException e) { 1779 throw e.getCause(); 1780 } finally { 1781 awaitUninterruptibly(canReturn); 1782 returned.countDown(); 1783 } 1784 } 1785 }); 1786 this.proxy = proxyObject; 1787 return proxyObject; 1788 } 1789 awaitStarted()1790 void awaitStarted() { 1791 assertTrue(awaitUninterruptibly(started, 10, SECONDS)); 1792 } 1793 awaitReturned()1794 void awaitReturned() { 1795 canReturn.countDown(); 1796 assertTrue(awaitUninterruptibly(returned, 10, SECONDS)); 1797 } 1798 } 1799 1800 static final class NoOpValueAndCloserConsumer<V> implements ValueAndCloserConsumer<V> { 1801 @Override accept(ValueAndCloser<V> valueAndCloser)1802 public void accept(ValueAndCloser<V> valueAndCloser) {} 1803 } 1804 } 1805