1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not 5 * use this file except in compliance with the License. You may obtain a copy of 6 * the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations under 14 * the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.util.concurrent.InterruptionUtil.repeatedlyInterruptTestThread; 20 import static com.google.common.util.concurrent.Uninterruptibles.awaitTerminationUninterruptibly; 21 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; 22 import static com.google.common.util.concurrent.Uninterruptibles.joinUninterruptibly; 23 import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly; 24 import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly; 25 import static com.google.common.util.concurrent.Uninterruptibles.tryAcquireUninterruptibly; 26 import static com.google.common.util.concurrent.Uninterruptibles.tryLockUninterruptibly; 27 import static java.util.concurrent.Executors.newFixedThreadPool; 28 import static java.util.concurrent.TimeUnit.MILLISECONDS; 29 import static java.util.concurrent.TimeUnit.SECONDS; 30 31 import com.google.common.base.Preconditions; 32 import com.google.common.base.Stopwatch; 33 import com.google.common.testing.NullPointerTester; 34 import com.google.common.testing.TearDown; 35 import com.google.common.testing.TearDownStack; 36 import com.google.errorprone.annotations.CanIgnoreReturnValue; 37 import java.time.Duration; 38 import java.util.Date; 39 import java.util.concurrent.ArrayBlockingQueue; 40 import java.util.concurrent.BlockingQueue; 41 import java.util.concurrent.CountDownLatch; 42 import java.util.concurrent.ExecutorService; 43 import java.util.concurrent.Executors; 44 import java.util.concurrent.Future; 45 import java.util.concurrent.ScheduledExecutorService; 46 import java.util.concurrent.Semaphore; 47 import java.util.concurrent.TimeUnit; 48 import java.util.concurrent.locks.Condition; 49 import java.util.concurrent.locks.Lock; 50 import java.util.concurrent.locks.ReentrantLock; 51 import junit.framework.TestCase; 52 53 /** 54 * Tests for {@link Uninterruptibles}. 55 * 56 * @author Anthony Zana 57 */ 58 public class UninterruptiblesTest extends TestCase { 59 private static final String EXPECTED_TAKE = "expectedTake"; 60 61 /** Timeout to use when we don't expect the timeout to expire. */ 62 private static final long LONG_DELAY_MS = 2500; 63 64 private static final long SLEEP_SLACK = 2; 65 66 private final TearDownStack tearDownStack = new TearDownStack(); 67 68 // NOTE: All durations in these tests are expressed in milliseconds 69 @Override setUp()70 protected void setUp() { 71 // Clear any previous interrupt before running the test. 72 if (Thread.currentThread().isInterrupted()) { 73 throw new AssertionError( 74 "Thread interrupted on test entry. " 75 + "Some test probably didn't clear the interrupt state"); 76 } 77 78 tearDownStack.addTearDown( 79 new TearDown() { 80 @Override 81 public void tearDown() { 82 Thread.interrupted(); 83 } 84 }); 85 } 86 87 @Override tearDown()88 protected void tearDown() { 89 tearDownStack.runTearDown(); 90 } 91 testNull()92 public void testNull() throws Exception { 93 new NullPointerTester() 94 .setDefault(CountDownLatch.class, new CountDownLatch(0)) 95 .setDefault(Semaphore.class, new Semaphore(999)) 96 .testAllPublicStaticMethods(Uninterruptibles.class); 97 } 98 99 // IncrementableCountDownLatch.await() tests 100 101 // CountDownLatch.await() tests 102 103 // Condition.await() tests testConditionAwaitTimeoutExceeded()104 public void testConditionAwaitTimeoutExceeded() { 105 Stopwatch stopwatch = Stopwatch.createStarted(); 106 Condition condition = TestCondition.create(); 107 108 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 500, MILLISECONDS); 109 110 assertFalse(signaledBeforeTimeout); 111 assertAtLeastTimePassed(stopwatch, 500); 112 assertNotInterrupted(); 113 } 114 testConditionAwaitTimeoutNotExceeded()115 public void testConditionAwaitTimeoutNotExceeded() { 116 Stopwatch stopwatch = Stopwatch.createStarted(); 117 Condition condition = TestCondition.createAndSignalAfter(500, MILLISECONDS); 118 119 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS); 120 121 assertTrue(signaledBeforeTimeout); 122 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 123 assertNotInterrupted(); 124 } 125 testConditionAwaitInterruptedTimeoutExceeded()126 public void testConditionAwaitInterruptedTimeoutExceeded() { 127 Stopwatch stopwatch = Stopwatch.createStarted(); 128 Condition condition = TestCondition.create(); 129 requestInterruptIn(500); 130 131 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1000, MILLISECONDS); 132 133 assertFalse(signaledBeforeTimeout); 134 assertAtLeastTimePassed(stopwatch, 1000); 135 assertInterrupted(); 136 } 137 testConditionAwaitInterruptedTimeoutNotExceeded()138 public void testConditionAwaitInterruptedTimeoutNotExceeded() { 139 Stopwatch stopwatch = Stopwatch.createStarted(); 140 Condition condition = TestCondition.createAndSignalAfter(1000, MILLISECONDS); 141 requestInterruptIn(500); 142 143 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS); 144 145 assertTrue(signaledBeforeTimeout); 146 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 147 assertInterrupted(); 148 } 149 150 // Lock.tryLock() tests testTryLockTimeoutExceeded()151 public void testTryLockTimeoutExceeded() { 152 Stopwatch stopwatch = Stopwatch.createStarted(); 153 Lock lock = new ReentrantLock(); 154 Thread lockThread = acquireFor(lock, 5, SECONDS); 155 156 boolean lockAcquired = tryLockUninterruptibly(lock, 500, MILLISECONDS); 157 158 assertFalse(lockAcquired); 159 assertAtLeastTimePassed(stopwatch, 500); 160 assertNotInterrupted(); 161 162 // finish locking thread 163 lockThread.interrupt(); 164 } 165 testTryLockTimeoutNotExceeded()166 public void testTryLockTimeoutNotExceeded() { 167 Stopwatch stopwatch = Stopwatch.createStarted(); 168 Lock lock = new ReentrantLock(); 169 acquireFor(lock, 500, MILLISECONDS); 170 171 boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS); 172 173 assertTrue(signaledBeforeTimeout); 174 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 175 assertNotInterrupted(); 176 } 177 testTryLockInterruptedTimeoutExceeded()178 public void testTryLockInterruptedTimeoutExceeded() { 179 Stopwatch stopwatch = Stopwatch.createStarted(); 180 Lock lock = new ReentrantLock(); 181 Thread lockThread = acquireFor(lock, 5, SECONDS); 182 requestInterruptIn(500); 183 184 boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1000, MILLISECONDS); 185 186 assertFalse(signaledBeforeTimeout); 187 assertAtLeastTimePassed(stopwatch, 1000); 188 assertInterrupted(); 189 190 // finish locking thread 191 lockThread.interrupt(); 192 } 193 testTryLockInterruptedTimeoutNotExceeded()194 public void testTryLockInterruptedTimeoutNotExceeded() { 195 Stopwatch stopwatch = Stopwatch.createStarted(); 196 Lock lock = new ReentrantLock(); 197 acquireFor(lock, 1000, MILLISECONDS); 198 requestInterruptIn(500); 199 200 boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS); 201 202 assertTrue(signaledBeforeTimeout); 203 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 204 assertInterrupted(); 205 } 206 207 // BlockingQueue.put() tests testPutWithNoWait()208 public void testPutWithNoWait() { 209 Stopwatch stopwatch = Stopwatch.createStarted(); 210 BlockingQueue<String> queue = new ArrayBlockingQueue<>(999); 211 putUninterruptibly(queue, ""); 212 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 213 assertEquals("", queue.peek()); 214 } 215 testPutNoInterrupt()216 public void testPutNoInterrupt() { 217 TimedPutQueue queue = TimedPutQueue.createWithDelay(20); 218 queue.putSuccessfully(); 219 assertNotInterrupted(); 220 } 221 testPutSingleInterrupt()222 public void testPutSingleInterrupt() { 223 TimedPutQueue queue = TimedPutQueue.createWithDelay(50); 224 requestInterruptIn(10); 225 queue.putSuccessfully(); 226 assertInterrupted(); 227 } 228 testPutMultiInterrupt()229 public void testPutMultiInterrupt() { 230 TimedPutQueue queue = TimedPutQueue.createWithDelay(100); 231 repeatedlyInterruptTestThread(20, tearDownStack); 232 queue.putSuccessfully(); 233 assertInterrupted(); 234 } 235 236 // BlockingQueue.take() tests testTakeWithNoWait()237 public void testTakeWithNoWait() { 238 Stopwatch stopwatch = Stopwatch.createStarted(); 239 BlockingQueue<String> queue = new ArrayBlockingQueue<>(1); 240 assertTrue(queue.offer("")); 241 assertEquals("", takeUninterruptibly(queue)); 242 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 243 } 244 testTakeNoInterrupt()245 public void testTakeNoInterrupt() { 246 TimedTakeQueue queue = TimedTakeQueue.createWithDelay(20); 247 queue.takeSuccessfully(); 248 assertNotInterrupted(); 249 } 250 testTakeSingleInterrupt()251 public void testTakeSingleInterrupt() { 252 TimedTakeQueue queue = TimedTakeQueue.createWithDelay(50); 253 requestInterruptIn(10); 254 queue.takeSuccessfully(); 255 assertInterrupted(); 256 } 257 testTakeMultiInterrupt()258 public void testTakeMultiInterrupt() { 259 TimedTakeQueue queue = TimedTakeQueue.createWithDelay(100); 260 repeatedlyInterruptTestThread(20, tearDownStack); 261 queue.takeSuccessfully(); 262 assertInterrupted(); 263 } 264 265 // join() tests testJoinWithNoWait()266 public void testJoinWithNoWait() throws InterruptedException { 267 Stopwatch stopwatch = Stopwatch.createStarted(); 268 Thread thread = new Thread(new JoinTarget(15)); 269 thread.start(); 270 thread.join(); 271 assertFalse(thread.isAlive()); 272 273 joinUninterruptibly(thread); 274 joinUninterruptibly(thread, 0, MILLISECONDS); 275 joinUninterruptibly(thread, -42, MILLISECONDS); 276 joinUninterruptibly(thread, LONG_DELAY_MS, MILLISECONDS); 277 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 278 } 279 testJoinNoInterrupt()280 public void testJoinNoInterrupt() { 281 TimedThread thread = TimedThread.createWithDelay(20); 282 thread.joinSuccessfully(); 283 assertNotInterrupted(); 284 } 285 testJoinTimeoutNoInterruptNotExpired()286 public void testJoinTimeoutNoInterruptNotExpired() { 287 TimedThread thread = TimedThread.createWithDelay(20); 288 thread.joinSuccessfully(LONG_DELAY_MS); 289 assertNotInterrupted(); 290 } 291 testJoinTimeoutNoInterruptExpired()292 public void testJoinTimeoutNoInterruptExpired() { 293 TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS); 294 thread.joinUnsuccessfully(30); 295 assertNotInterrupted(); 296 } 297 testJoinSingleInterrupt()298 public void testJoinSingleInterrupt() { 299 TimedThread thread = TimedThread.createWithDelay(50); 300 requestInterruptIn(10); 301 thread.joinSuccessfully(); 302 assertInterrupted(); 303 } 304 testJoinTimeoutSingleInterruptNoExpire()305 public void testJoinTimeoutSingleInterruptNoExpire() { 306 TimedThread thread = TimedThread.createWithDelay(50); 307 requestInterruptIn(10); 308 thread.joinSuccessfully(LONG_DELAY_MS); 309 assertInterrupted(); 310 } 311 testJoinTimeoutSingleInterruptExpired()312 public void testJoinTimeoutSingleInterruptExpired() { 313 TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS); 314 requestInterruptIn(10); 315 thread.joinUnsuccessfully(50); 316 assertInterrupted(); 317 } 318 testJoinMultiInterrupt()319 public void testJoinMultiInterrupt() { 320 TimedThread thread = TimedThread.createWithDelay(100); 321 repeatedlyInterruptTestThread(20, tearDownStack); 322 thread.joinSuccessfully(); 323 assertInterrupted(); 324 } 325 testJoinTimeoutMultiInterruptNoExpire()326 public void testJoinTimeoutMultiInterruptNoExpire() { 327 TimedThread thread = TimedThread.createWithDelay(100); 328 repeatedlyInterruptTestThread(20, tearDownStack); 329 thread.joinSuccessfully(LONG_DELAY_MS); 330 assertInterrupted(); 331 } 332 testJoinTimeoutMultiInterruptExpired()333 public void testJoinTimeoutMultiInterruptExpired() { 334 /* 335 * We don't "need" to schedule a thread completion at all here, but by doing 336 * so, we come the closest we can to testing that the wait time is 337 * appropriately decreased on each progressive join() call. 338 */ 339 TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS); 340 repeatedlyInterruptTestThread(20, tearDownStack); 341 thread.joinUnsuccessfully(70); 342 assertInterrupted(); 343 } 344 345 // sleep() Tests testSleepNoInterrupt()346 public void testSleepNoInterrupt() { 347 sleepSuccessfully(10); 348 } 349 testSleepSingleInterrupt()350 public void testSleepSingleInterrupt() { 351 requestInterruptIn(10); 352 sleepSuccessfully(50); 353 assertInterrupted(); 354 } 355 testSleepMultiInterrupt()356 public void testSleepMultiInterrupt() { 357 repeatedlyInterruptTestThread(10, tearDownStack); 358 sleepSuccessfully(100); 359 assertInterrupted(); 360 } 361 362 // Semaphore.tryAcquire() tests testTryAcquireWithNoWait()363 public void testTryAcquireWithNoWait() { 364 Stopwatch stopwatch = Stopwatch.createStarted(); 365 Semaphore semaphore = new Semaphore(99); 366 assertTrue(tryAcquireUninterruptibly(semaphore, 0, MILLISECONDS)); 367 assertTrue(tryAcquireUninterruptibly(semaphore, -42, MILLISECONDS)); 368 assertTrue(tryAcquireUninterruptibly(semaphore, LONG_DELAY_MS, MILLISECONDS)); 369 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 370 } 371 testTryAcquireTimeoutNoInterruptNotExpired()372 public void testTryAcquireTimeoutNoInterruptNotExpired() { 373 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20); 374 semaphore.tryAcquireSuccessfully(LONG_DELAY_MS); 375 assertNotInterrupted(); 376 } 377 testTryAcquireTimeoutNoInterruptExpired()378 public void testTryAcquireTimeoutNoInterruptExpired() { 379 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 380 semaphore.tryAcquireUnsuccessfully(30); 381 assertNotInterrupted(); 382 } 383 testTryAcquireTimeoutSingleInterruptNoExpire()384 public void testTryAcquireTimeoutSingleInterruptNoExpire() { 385 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50); 386 requestInterruptIn(10); 387 semaphore.tryAcquireSuccessfully(LONG_DELAY_MS); 388 assertInterrupted(); 389 } 390 testTryAcquireTimeoutSingleInterruptExpired()391 public void testTryAcquireTimeoutSingleInterruptExpired() { 392 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 393 requestInterruptIn(10); 394 semaphore.tryAcquireUnsuccessfully(50); 395 assertInterrupted(); 396 } 397 testTryAcquireTimeoutMultiInterruptNoExpire()398 public void testTryAcquireTimeoutMultiInterruptNoExpire() { 399 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100); 400 repeatedlyInterruptTestThread(20, tearDownStack); 401 semaphore.tryAcquireSuccessfully(LONG_DELAY_MS); 402 assertInterrupted(); 403 } 404 testTryAcquireTimeoutMultiInterruptExpired()405 public void testTryAcquireTimeoutMultiInterruptExpired() { 406 /* 407 * We don't "need" to schedule a release() call at all here, but by doing 408 * so, we come the closest we can to testing that the wait time is 409 * appropriately decreased on each progressive tryAcquire() call. 410 */ 411 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 412 repeatedlyInterruptTestThread(20, tearDownStack); 413 semaphore.tryAcquireUnsuccessfully(70); 414 assertInterrupted(); 415 } 416 testTryAcquireWithNoWaitMultiPermit()417 public void testTryAcquireWithNoWaitMultiPermit() { 418 Stopwatch stopwatch = Stopwatch.createStarted(); 419 Semaphore semaphore = new Semaphore(99); 420 assertTrue(tryAcquireUninterruptibly(semaphore, 10, 0, MILLISECONDS)); 421 assertTrue(tryAcquireUninterruptibly(semaphore, 10, -42, MILLISECONDS)); 422 assertTrue(tryAcquireUninterruptibly(semaphore, 10, LONG_DELAY_MS, MILLISECONDS)); 423 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 424 } 425 testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit()426 public void testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit() { 427 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20); 428 semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS); 429 assertNotInterrupted(); 430 } 431 testTryAcquireTimeoutNoInterruptExpiredMultiPermit()432 public void testTryAcquireTimeoutNoInterruptExpiredMultiPermit() { 433 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 434 semaphore.tryAcquireUnsuccessfully(10, 30); 435 assertNotInterrupted(); 436 } 437 testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit()438 public void testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit() { 439 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50); 440 requestInterruptIn(10); 441 semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS); 442 assertInterrupted(); 443 } 444 testTryAcquireTimeoutSingleInterruptExpiredMultiPermit()445 public void testTryAcquireTimeoutSingleInterruptExpiredMultiPermit() { 446 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 447 requestInterruptIn(10); 448 semaphore.tryAcquireUnsuccessfully(10, 50); 449 assertInterrupted(); 450 } 451 testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit()452 public void testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit() { 453 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100); 454 repeatedlyInterruptTestThread(20, tearDownStack); 455 semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS); 456 assertInterrupted(); 457 } 458 testTryAcquireTimeoutMultiInterruptExpiredMultiPermit()459 public void testTryAcquireTimeoutMultiInterruptExpiredMultiPermit() { 460 /* 461 * We don't "need" to schedule a release() call at all here, but by doing 462 * so, we come the closest we can to testing that the wait time is 463 * appropriately decreased on each progressive tryAcquire() call. 464 */ 465 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 466 repeatedlyInterruptTestThread(20, tearDownStack); 467 semaphore.tryAcquireUnsuccessfully(10, 70); 468 assertInterrupted(); 469 } 470 471 // executor.awaitTermination Testcases testTryAwaitTerminationUninterruptiblyDuration_success()472 public void testTryAwaitTerminationUninterruptiblyDuration_success() { 473 ExecutorService executor = newFixedThreadPool(1); 474 requestInterruptIn(500); 475 executor.execute(new SleepTask(1000)); 476 executor.shutdown(); 477 assertTrue(awaitTerminationUninterruptibly(executor, Duration.ofMillis(LONG_DELAY_MS))); 478 assertTrue(executor.isTerminated()); 479 assertInterrupted(); 480 } 481 testTryAwaitTerminationUninterruptiblyDuration_failure()482 public void testTryAwaitTerminationUninterruptiblyDuration_failure() { 483 ExecutorService executor = newFixedThreadPool(1); 484 requestInterruptIn(500); 485 executor.execute(new SleepTask(10000)); 486 executor.shutdown(); 487 assertFalse(awaitTerminationUninterruptibly(executor, Duration.ofMillis(1000))); 488 assertFalse(executor.isTerminated()); 489 assertInterrupted(); 490 } 491 testTryAwaitTerminationUninterruptiblyLongTimeUnit_success()492 public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_success() { 493 ExecutorService executor = newFixedThreadPool(1); 494 requestInterruptIn(500); 495 executor.execute(new SleepTask(1000)); 496 executor.shutdown(); 497 assertTrue(awaitTerminationUninterruptibly(executor, LONG_DELAY_MS, MILLISECONDS)); 498 assertTrue(executor.isTerminated()); 499 assertInterrupted(); 500 } 501 testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure()502 public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure() { 503 ExecutorService executor = newFixedThreadPool(1); 504 requestInterruptIn(500); 505 executor.execute(new SleepTask(10000)); 506 executor.shutdown(); 507 assertFalse(awaitTerminationUninterruptibly(executor, 1000, MILLISECONDS)); 508 assertFalse(executor.isTerminated()); 509 assertInterrupted(); 510 } 511 testTryAwaitTerminationInfiniteTimeout()512 public void testTryAwaitTerminationInfiniteTimeout() { 513 ExecutorService executor = newFixedThreadPool(1); 514 requestInterruptIn(500); 515 executor.execute(new SleepTask(1000)); 516 executor.shutdown(); 517 awaitTerminationUninterruptibly(executor); 518 assertTrue(executor.isTerminated()); 519 assertInterrupted(); 520 } 521 522 /** 523 * Wrapper around {@link Stopwatch} which also contains an "expected completion time." Creating a 524 * {@code Completion} starts the underlying stopwatch. 525 */ 526 private static final class Completion { 527 final Stopwatch stopwatch; 528 final long expectedCompletionWaitMillis; 529 Completion(long expectedCompletionWaitMillis)530 Completion(long expectedCompletionWaitMillis) { 531 this.expectedCompletionWaitMillis = expectedCompletionWaitMillis; 532 stopwatch = Stopwatch.createStarted(); 533 } 534 535 /** 536 * Asserts that the expected completion time has passed (and not "too much" time beyond that). 537 */ assertCompletionExpected()538 void assertCompletionExpected() { 539 assertAtLeastTimePassed(stopwatch, expectedCompletionWaitMillis); 540 assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis + LONG_DELAY_MS); 541 } 542 543 /** 544 * Asserts that at least {@code timeout} has passed but the expected completion time has not. 545 */ assertCompletionNotExpected(long timeout)546 void assertCompletionNotExpected(long timeout) { 547 Preconditions.checkArgument(timeout < expectedCompletionWaitMillis); 548 assertAtLeastTimePassed(stopwatch, timeout); 549 assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis); 550 } 551 } 552 553 private static void assertAtLeastTimePassed(Stopwatch stopwatch, long expectedMillis) { 554 long elapsedMillis = stopwatch.elapsed(MILLISECONDS); 555 /* 556 * The "+ 5" below is to permit, say, sleep(10) to sleep only 9 milliseconds. We see such 557 * behavior sometimes when running these tests publicly as part of Guava. "+ 5" is probably more 558 * generous than it needs to be. 559 */ 560 assertTrue( 561 "Expected elapsed millis to be >= " + expectedMillis + " but was " + elapsedMillis, 562 elapsedMillis + 5 >= expectedMillis); 563 } 564 565 // TODO(cpovirk): Split this into separate CountDownLatch and IncrementableCountDownLatch classes. 566 567 /** Manages a {@link BlockingQueue} and associated timings for a {@code put} call. */ 568 private static final class TimedPutQueue { 569 final BlockingQueue<String> queue; 570 final Completion completed; 571 572 /** 573 * Creates a {@link EnableWrites} which open up a spot for a {@code put} to succeed in {@code 574 * countdownInMillis}. 575 */ createWithDelay(long countdownInMillis)576 static TimedPutQueue createWithDelay(long countdownInMillis) { 577 return new TimedPutQueue(countdownInMillis); 578 } 579 TimedPutQueue(long countdownInMillis)580 private TimedPutQueue(long countdownInMillis) { 581 this.queue = new ArrayBlockingQueue<>(1); 582 assertTrue(queue.offer("blocksPutCallsUntilRemoved")); 583 this.completed = new Completion(countdownInMillis); 584 scheduleEnableWrites(this.queue, countdownInMillis); 585 } 586 587 /** Perform a {@code put} and assert that operation completed in the expected timeframe. */ putSuccessfully()588 void putSuccessfully() { 589 putUninterruptibly(queue, ""); 590 completed.assertCompletionExpected(); 591 assertEquals("", queue.peek()); 592 } 593 scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis)594 private static void scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis) { 595 Runnable toRun = new EnableWrites(queue, countdownInMillis); 596 // TODO(cpovirk): automatically fail the test if this thread throws 597 Thread enablerThread = new Thread(toRun); 598 enablerThread.start(); 599 } 600 } 601 602 /** Manages a {@link BlockingQueue} and associated timings for a {@code take} call. */ 603 private static final class TimedTakeQueue { 604 final BlockingQueue<String> queue; 605 final Completion completed; 606 607 /** 608 * Creates a {@link EnableReads} which insert an element for a {@code take} to receive in {@code 609 * countdownInMillis}. 610 */ createWithDelay(long countdownInMillis)611 static TimedTakeQueue createWithDelay(long countdownInMillis) { 612 return new TimedTakeQueue(countdownInMillis); 613 } 614 TimedTakeQueue(long countdownInMillis)615 private TimedTakeQueue(long countdownInMillis) { 616 this.queue = new ArrayBlockingQueue<>(1); 617 this.completed = new Completion(countdownInMillis); 618 scheduleEnableReads(this.queue, countdownInMillis); 619 } 620 621 /** Perform a {@code take} and assert that operation completed in the expected timeframe. */ takeSuccessfully()622 void takeSuccessfully() { 623 assertEquals(EXPECTED_TAKE, takeUninterruptibly(queue)); 624 completed.assertCompletionExpected(); 625 assertTrue(queue.isEmpty()); 626 } 627 scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis)628 private static void scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis) { 629 Runnable toRun = new EnableReads(queue, countdownInMillis); 630 // TODO(cpovirk): automatically fail the test if this thread throws 631 Thread enablerThread = new Thread(toRun); 632 enablerThread.start(); 633 } 634 } 635 636 /** Manages a {@link Semaphore} and associated timings. */ 637 private static final class TimedSemaphore { 638 final Semaphore semaphore; 639 final Completion completed; 640 641 /** 642 * Create a {@link Release} which will release a semaphore permit in {@code countdownInMillis}. 643 */ createWithDelay(long countdownInMillis)644 static TimedSemaphore createWithDelay(long countdownInMillis) { 645 return new TimedSemaphore(countdownInMillis); 646 } 647 TimedSemaphore(long countdownInMillis)648 private TimedSemaphore(long countdownInMillis) { 649 this.semaphore = new Semaphore(0); 650 this.completed = new Completion(countdownInMillis); 651 scheduleRelease(countdownInMillis); 652 } 653 654 /** 655 * Requests a permit from the semaphore with a timeout and asserts that operation completed in 656 * the expected timeframe. 657 */ tryAcquireSuccessfully(long timeoutMillis)658 void tryAcquireSuccessfully(long timeoutMillis) { 659 assertTrue(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS)); 660 completed.assertCompletionExpected(); 661 } 662 tryAcquireSuccessfully(int permits, long timeoutMillis)663 void tryAcquireSuccessfully(int permits, long timeoutMillis) { 664 assertTrue(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS)); 665 completed.assertCompletionExpected(); 666 } 667 668 /** 669 * Requests a permit from the semaphore with a timeout and asserts that the wait returned within 670 * the expected timeout. 671 */ tryAcquireUnsuccessfully(long timeoutMillis)672 private void tryAcquireUnsuccessfully(long timeoutMillis) { 673 assertFalse(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS)); 674 completed.assertCompletionNotExpected(timeoutMillis); 675 } 676 tryAcquireUnsuccessfully(int permits, long timeoutMillis)677 private void tryAcquireUnsuccessfully(int permits, long timeoutMillis) { 678 assertFalse(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS)); 679 completed.assertCompletionNotExpected(timeoutMillis); 680 } 681 scheduleRelease(long countdownInMillis)682 private void scheduleRelease(long countdownInMillis) { 683 DelayedActionRunnable toRun = new Release(semaphore, countdownInMillis); 684 // TODO(cpovirk): automatically fail the test if this thread throws 685 Thread releaserThread = new Thread(toRun); 686 releaserThread.start(); 687 } 688 } 689 690 private abstract static class DelayedActionRunnable implements Runnable { 691 private final long tMinus; 692 DelayedActionRunnable(long tMinus)693 protected DelayedActionRunnable(long tMinus) { 694 this.tMinus = tMinus; 695 } 696 697 @Override run()698 public final void run() { 699 try { 700 Thread.sleep(tMinus); 701 } catch (InterruptedException e) { 702 throw new AssertionError(e); 703 } 704 doAction(); 705 } 706 doAction()707 protected abstract void doAction(); 708 } 709 710 private static class CountDown extends DelayedActionRunnable { 711 private final CountDownLatch latch; 712 CountDown(CountDownLatch latch, long tMinus)713 public CountDown(CountDownLatch latch, long tMinus) { 714 super(tMinus); 715 this.latch = latch; 716 } 717 718 @Override doAction()719 protected void doAction() { 720 latch.countDown(); 721 } 722 } 723 724 private static class EnableWrites extends DelayedActionRunnable { 725 private final BlockingQueue<String> queue; 726 EnableWrites(BlockingQueue<String> queue, long tMinus)727 public EnableWrites(BlockingQueue<String> queue, long tMinus) { 728 super(tMinus); 729 assertFalse(queue.isEmpty()); 730 assertFalse(queue.offer("shouldBeRejected")); 731 this.queue = queue; 732 } 733 734 @Override doAction()735 protected void doAction() { 736 assertNotNull(queue.remove()); 737 } 738 } 739 740 private static class EnableReads extends DelayedActionRunnable { 741 private final BlockingQueue<String> queue; 742 EnableReads(BlockingQueue<String> queue, long tMinus)743 public EnableReads(BlockingQueue<String> queue, long tMinus) { 744 super(tMinus); 745 assertTrue(queue.isEmpty()); 746 this.queue = queue; 747 } 748 749 @Override doAction()750 protected void doAction() { 751 assertTrue(queue.offer(EXPECTED_TAKE)); 752 } 753 } 754 755 private static final class TimedThread { 756 private final Thread thread; 757 private final Completion completed; 758 createWithDelay(long countdownInMillis)759 static TimedThread createWithDelay(long countdownInMillis) { 760 return new TimedThread(countdownInMillis); 761 } 762 TimedThread(long expectedCompletionWaitMillis)763 private TimedThread(long expectedCompletionWaitMillis) { 764 completed = new Completion(expectedCompletionWaitMillis); 765 thread = new Thread(new JoinTarget(expectedCompletionWaitMillis)); 766 thread.start(); 767 } 768 joinSuccessfully()769 void joinSuccessfully() { 770 Uninterruptibles.joinUninterruptibly(thread); 771 completed.assertCompletionExpected(); 772 assertEquals(Thread.State.TERMINATED, thread.getState()); 773 } 774 joinSuccessfully(long timeoutMillis)775 void joinSuccessfully(long timeoutMillis) { 776 Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS); 777 completed.assertCompletionExpected(); 778 assertEquals(Thread.State.TERMINATED, thread.getState()); 779 } 780 joinUnsuccessfully(long timeoutMillis)781 void joinUnsuccessfully(long timeoutMillis) { 782 Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS); 783 completed.assertCompletionNotExpected(timeoutMillis); 784 assertFalse(Thread.State.TERMINATED.equals(thread.getState())); 785 } 786 } 787 788 private static class JoinTarget extends DelayedActionRunnable { JoinTarget(long tMinus)789 public JoinTarget(long tMinus) { 790 super(tMinus); 791 } 792 793 @Override doAction()794 protected void doAction() {} 795 } 796 797 private static class Release extends DelayedActionRunnable { 798 private final Semaphore semaphore; 799 Release(Semaphore semaphore, long tMinus)800 public Release(Semaphore semaphore, long tMinus) { 801 super(tMinus); 802 this.semaphore = semaphore; 803 } 804 805 @Override doAction()806 protected void doAction() { 807 semaphore.release(10); 808 } 809 } 810 811 private static final class SleepTask extends DelayedActionRunnable { SleepTask(long tMinus)812 SleepTask(long tMinus) { 813 super(tMinus); 814 } 815 816 @Override doAction()817 protected void doAction() {} 818 } 819 sleepSuccessfully(long sleepMillis)820 private static void sleepSuccessfully(long sleepMillis) { 821 Completion completed = new Completion(sleepMillis - SLEEP_SLACK); 822 Uninterruptibles.sleepUninterruptibly(sleepMillis, MILLISECONDS); 823 completed.assertCompletionExpected(); 824 } 825 assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis)826 private static void assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis) { 827 long elapsedMillis = stopwatch.elapsed(MILLISECONDS); 828 assertTrue(elapsedMillis < timelimitMillis); 829 } 830 831 /** 832 * Await an interrupt, then clear the interrupt status. Similar to {@code 833 * assertTrue(Thread.interrupted())} except that this version tolerates late interrupts. 834 */ 835 private static void assertInterrupted() { 836 try { 837 /* 838 * The sleep() will end immediately if we've already been interrupted or 839 * wait patiently for the interrupt if not. 840 */ 841 Thread.sleep(LONG_DELAY_MS); 842 fail("Dude, where's my interrupt?"); 843 } catch (InterruptedException expected) { 844 } 845 } 846 847 private static void assertNotInterrupted() { 848 assertFalse(Thread.interrupted()); 849 } 850 851 private static void requestInterruptIn(long millis) { 852 InterruptionUtil.requestInterruptIn(millis, MILLISECONDS); 853 } 854 855 @CanIgnoreReturnValue 856 private static Thread acquireFor(final Lock lock, final long duration, final TimeUnit unit) { 857 final CountDownLatch latch = new CountDownLatch(1); 858 Thread thread = 859 new Thread() { 860 @Override 861 public void run() { 862 lock.lock(); 863 latch.countDown(); 864 try { 865 Thread.sleep(unit.toMillis(duration)); 866 } catch (InterruptedException e) { 867 // simply finish execution 868 } finally { 869 lock.unlock(); 870 } 871 } 872 }; 873 thread.setDaemon(true); 874 thread.start(); 875 awaitUninterruptibly(latch); 876 return thread; 877 } 878 879 private static class TestCondition implements Condition { 880 private final Lock lock; 881 private final Condition condition; 882 883 private TestCondition(Lock lock, Condition condition) { 884 this.lock = lock; 885 this.condition = condition; 886 } 887 888 static TestCondition createAndSignalAfter(long delay, TimeUnit unit) { 889 final TestCondition testCondition = create(); 890 891 ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1); 892 // If signal() fails somehow, we should see a failed test, even without looking at the Future. 893 Future<?> unused = 894 scheduledPool.schedule( 895 new Runnable() { 896 @Override 897 public void run() { 898 testCondition.signal(); 899 } 900 }, 901 delay, 902 unit); 903 904 return testCondition; 905 } 906 907 static TestCondition create() { 908 Lock lock = new ReentrantLock(); 909 Condition condition = lock.newCondition(); 910 return new TestCondition(lock, condition); 911 } 912 913 @Override 914 public void await() throws InterruptedException { 915 lock.lock(); 916 try { 917 condition.await(); 918 } finally { 919 lock.unlock(); 920 } 921 } 922 923 @Override 924 public boolean await(long time, TimeUnit unit) throws InterruptedException { 925 lock.lock(); 926 try { 927 return condition.await(time, unit); 928 } finally { 929 lock.unlock(); 930 } 931 } 932 933 @Override 934 public void awaitUninterruptibly() { 935 lock.lock(); 936 try { 937 condition.awaitUninterruptibly(); 938 } finally { 939 lock.unlock(); 940 } 941 } 942 943 @Override 944 public long awaitNanos(long nanosTimeout) throws InterruptedException { 945 lock.lock(); 946 try { 947 return condition.awaitNanos(nanosTimeout); 948 } finally { 949 lock.unlock(); 950 } 951 } 952 953 @Override 954 public boolean awaitUntil(Date deadline) throws InterruptedException { 955 lock.lock(); 956 try { 957 return condition.awaitUntil(deadline); 958 } finally { 959 lock.unlock(); 960 } 961 } 962 963 @Override 964 public void signal() { 965 lock.lock(); 966 try { 967 condition.signal(); 968 } finally { 969 lock.unlock(); 970 } 971 } 972 973 @Override 974 public void signalAll() { 975 lock.lock(); 976 try { 977 condition.signalAll(); 978 } finally { 979 lock.unlock(); 980 } 981 } 982 } 983 } 984