1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not 5 * use this file except in compliance with the License. You may obtain a copy of 6 * the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations under 14 * the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.util.concurrent.InterruptionUtil.repeatedlyInterruptTestThread; 20 import static com.google.common.util.concurrent.Uninterruptibles.awaitTerminationUninterruptibly; 21 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; 22 import static com.google.common.util.concurrent.Uninterruptibles.joinUninterruptibly; 23 import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly; 24 import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly; 25 import static com.google.common.util.concurrent.Uninterruptibles.tryAcquireUninterruptibly; 26 import static com.google.common.util.concurrent.Uninterruptibles.tryLockUninterruptibly; 27 import static java.util.concurrent.Executors.newFixedThreadPool; 28 import static java.util.concurrent.TimeUnit.MILLISECONDS; 29 import static java.util.concurrent.TimeUnit.SECONDS; 30 31 import com.google.common.base.Preconditions; 32 import com.google.common.base.Stopwatch; 33 import com.google.common.testing.NullPointerTester; 34 import com.google.common.testing.TearDown; 35 import com.google.common.testing.TearDownStack; 36 import com.google.errorprone.annotations.CanIgnoreReturnValue; 37 import java.util.Date; 38 import java.util.concurrent.ArrayBlockingQueue; 39 import java.util.concurrent.BlockingQueue; 40 import java.util.concurrent.CountDownLatch; 41 import java.util.concurrent.ExecutorService; 42 import java.util.concurrent.Executors; 43 import java.util.concurrent.Future; 44 import java.util.concurrent.ScheduledExecutorService; 45 import java.util.concurrent.Semaphore; 46 import java.util.concurrent.TimeUnit; 47 import java.util.concurrent.locks.Condition; 48 import java.util.concurrent.locks.Lock; 49 import java.util.concurrent.locks.ReentrantLock; 50 import junit.framework.TestCase; 51 52 /** 53 * Tests for {@link Uninterruptibles}. 54 * 55 * @author Anthony Zana 56 */ 57 public class UninterruptiblesTest extends TestCase { 58 private static final String EXPECTED_TAKE = "expectedTake"; 59 60 /** Timeout to use when we don't expect the timeout to expire. */ 61 private static final long LONG_DELAY_MS = 2500; 62 63 private static final long SLEEP_SLACK = 2; 64 65 private final TearDownStack tearDownStack = new TearDownStack(); 66 67 // NOTE: All durations in these tests are expressed in milliseconds 68 @Override setUp()69 protected void setUp() { 70 // Clear any previous interrupt before running the test. 71 if (Thread.currentThread().isInterrupted()) { 72 throw new AssertionError( 73 "Thread interrupted on test entry. " 74 + "Some test probably didn't clear the interrupt state"); 75 } 76 77 tearDownStack.addTearDown( 78 new TearDown() { 79 @Override 80 public void tearDown() { 81 Thread.interrupted(); 82 } 83 }); 84 } 85 86 @Override tearDown()87 protected void tearDown() { 88 tearDownStack.runTearDown(); 89 } 90 testNull()91 public void testNull() throws Exception { 92 new NullPointerTester() 93 .setDefault(CountDownLatch.class, new CountDownLatch(0)) 94 .setDefault(Semaphore.class, new Semaphore(999)) 95 .testAllPublicStaticMethods(Uninterruptibles.class); 96 } 97 98 // IncrementableCountDownLatch.await() tests 99 100 // CountDownLatch.await() tests 101 102 // Condition.await() tests testConditionAwaitTimeoutExceeded()103 public void testConditionAwaitTimeoutExceeded() { 104 Stopwatch stopwatch = Stopwatch.createStarted(); 105 Condition condition = TestCondition.create(); 106 107 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 500, MILLISECONDS); 108 109 assertFalse(signaledBeforeTimeout); 110 assertAtLeastTimePassed(stopwatch, 500); 111 assertNotInterrupted(); 112 } 113 testConditionAwaitTimeoutNotExceeded()114 public void testConditionAwaitTimeoutNotExceeded() { 115 Stopwatch stopwatch = Stopwatch.createStarted(); 116 Condition condition = TestCondition.createAndSignalAfter(500, MILLISECONDS); 117 118 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS); 119 120 assertTrue(signaledBeforeTimeout); 121 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 122 assertNotInterrupted(); 123 } 124 testConditionAwaitInterruptedTimeoutExceeded()125 public void testConditionAwaitInterruptedTimeoutExceeded() { 126 Stopwatch stopwatch = Stopwatch.createStarted(); 127 Condition condition = TestCondition.create(); 128 requestInterruptIn(500); 129 130 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1000, MILLISECONDS); 131 132 assertFalse(signaledBeforeTimeout); 133 assertAtLeastTimePassed(stopwatch, 1000); 134 assertInterrupted(); 135 } 136 testConditionAwaitInterruptedTimeoutNotExceeded()137 public void testConditionAwaitInterruptedTimeoutNotExceeded() { 138 Stopwatch stopwatch = Stopwatch.createStarted(); 139 Condition condition = TestCondition.createAndSignalAfter(1000, MILLISECONDS); 140 requestInterruptIn(500); 141 142 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS); 143 144 assertTrue(signaledBeforeTimeout); 145 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 146 assertInterrupted(); 147 } 148 149 // Lock.tryLock() tests testTryLockTimeoutExceeded()150 public void testTryLockTimeoutExceeded() { 151 Stopwatch stopwatch = Stopwatch.createStarted(); 152 Lock lock = new ReentrantLock(); 153 Thread lockThread = acquireFor(lock, 5, SECONDS); 154 155 boolean lockAcquired = tryLockUninterruptibly(lock, 500, MILLISECONDS); 156 157 assertFalse(lockAcquired); 158 assertAtLeastTimePassed(stopwatch, 500); 159 assertNotInterrupted(); 160 161 // finish locking thread 162 lockThread.interrupt(); 163 } 164 testTryLockTimeoutNotExceeded()165 public void testTryLockTimeoutNotExceeded() { 166 Stopwatch stopwatch = Stopwatch.createStarted(); 167 Lock lock = new ReentrantLock(); 168 acquireFor(lock, 500, MILLISECONDS); 169 170 boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS); 171 172 assertTrue(signaledBeforeTimeout); 173 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 174 assertNotInterrupted(); 175 } 176 testTryLockInterruptedTimeoutExceeded()177 public void testTryLockInterruptedTimeoutExceeded() { 178 Stopwatch stopwatch = Stopwatch.createStarted(); 179 Lock lock = new ReentrantLock(); 180 Thread lockThread = acquireFor(lock, 5, SECONDS); 181 requestInterruptIn(500); 182 183 boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1000, MILLISECONDS); 184 185 assertFalse(signaledBeforeTimeout); 186 assertAtLeastTimePassed(stopwatch, 1000); 187 assertInterrupted(); 188 189 // finish locking thread 190 lockThread.interrupt(); 191 } 192 testTryLockInterruptedTimeoutNotExceeded()193 public void testTryLockInterruptedTimeoutNotExceeded() { 194 Stopwatch stopwatch = Stopwatch.createStarted(); 195 Lock lock = new ReentrantLock(); 196 acquireFor(lock, 1000, MILLISECONDS); 197 requestInterruptIn(500); 198 199 boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS); 200 201 assertTrue(signaledBeforeTimeout); 202 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 203 assertInterrupted(); 204 } 205 206 // BlockingQueue.put() tests testPutWithNoWait()207 public void testPutWithNoWait() { 208 Stopwatch stopwatch = Stopwatch.createStarted(); 209 BlockingQueue<String> queue = new ArrayBlockingQueue<>(999); 210 putUninterruptibly(queue, ""); 211 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 212 assertEquals("", queue.peek()); 213 } 214 testPutNoInterrupt()215 public void testPutNoInterrupt() { 216 TimedPutQueue queue = TimedPutQueue.createWithDelay(20); 217 queue.putSuccessfully(); 218 assertNotInterrupted(); 219 } 220 testPutSingleInterrupt()221 public void testPutSingleInterrupt() { 222 TimedPutQueue queue = TimedPutQueue.createWithDelay(50); 223 requestInterruptIn(10); 224 queue.putSuccessfully(); 225 assertInterrupted(); 226 } 227 testPutMultiInterrupt()228 public void testPutMultiInterrupt() { 229 TimedPutQueue queue = TimedPutQueue.createWithDelay(100); 230 repeatedlyInterruptTestThread(20, tearDownStack); 231 queue.putSuccessfully(); 232 assertInterrupted(); 233 } 234 235 // BlockingQueue.take() tests testTakeWithNoWait()236 public void testTakeWithNoWait() { 237 Stopwatch stopwatch = Stopwatch.createStarted(); 238 BlockingQueue<String> queue = new ArrayBlockingQueue<>(1); 239 assertTrue(queue.offer("")); 240 assertEquals("", takeUninterruptibly(queue)); 241 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 242 } 243 testTakeNoInterrupt()244 public void testTakeNoInterrupt() { 245 TimedTakeQueue queue = TimedTakeQueue.createWithDelay(20); 246 queue.takeSuccessfully(); 247 assertNotInterrupted(); 248 } 249 testTakeSingleInterrupt()250 public void testTakeSingleInterrupt() { 251 TimedTakeQueue queue = TimedTakeQueue.createWithDelay(50); 252 requestInterruptIn(10); 253 queue.takeSuccessfully(); 254 assertInterrupted(); 255 } 256 testTakeMultiInterrupt()257 public void testTakeMultiInterrupt() { 258 TimedTakeQueue queue = TimedTakeQueue.createWithDelay(100); 259 repeatedlyInterruptTestThread(20, tearDownStack); 260 queue.takeSuccessfully(); 261 assertInterrupted(); 262 } 263 264 // join() tests testJoinWithNoWait()265 public void testJoinWithNoWait() throws InterruptedException { 266 Stopwatch stopwatch = Stopwatch.createStarted(); 267 Thread thread = new Thread(new JoinTarget(15)); 268 thread.start(); 269 thread.join(); 270 assertFalse(thread.isAlive()); 271 272 joinUninterruptibly(thread); 273 joinUninterruptibly(thread, 0, MILLISECONDS); 274 joinUninterruptibly(thread, -42, MILLISECONDS); 275 joinUninterruptibly(thread, LONG_DELAY_MS, MILLISECONDS); 276 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 277 } 278 testJoinNoInterrupt()279 public void testJoinNoInterrupt() { 280 TimedThread thread = TimedThread.createWithDelay(20); 281 thread.joinSuccessfully(); 282 assertNotInterrupted(); 283 } 284 testJoinTimeoutNoInterruptNotExpired()285 public void testJoinTimeoutNoInterruptNotExpired() { 286 TimedThread thread = TimedThread.createWithDelay(20); 287 thread.joinSuccessfully(LONG_DELAY_MS); 288 assertNotInterrupted(); 289 } 290 testJoinTimeoutNoInterruptExpired()291 public void testJoinTimeoutNoInterruptExpired() { 292 TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS); 293 thread.joinUnsuccessfully(30); 294 assertNotInterrupted(); 295 } 296 testJoinSingleInterrupt()297 public void testJoinSingleInterrupt() { 298 TimedThread thread = TimedThread.createWithDelay(50); 299 requestInterruptIn(10); 300 thread.joinSuccessfully(); 301 assertInterrupted(); 302 } 303 testJoinTimeoutSingleInterruptNoExpire()304 public void testJoinTimeoutSingleInterruptNoExpire() { 305 TimedThread thread = TimedThread.createWithDelay(50); 306 requestInterruptIn(10); 307 thread.joinSuccessfully(LONG_DELAY_MS); 308 assertInterrupted(); 309 } 310 testJoinTimeoutSingleInterruptExpired()311 public void testJoinTimeoutSingleInterruptExpired() { 312 TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS); 313 requestInterruptIn(10); 314 thread.joinUnsuccessfully(50); 315 assertInterrupted(); 316 } 317 testJoinMultiInterrupt()318 public void testJoinMultiInterrupt() { 319 TimedThread thread = TimedThread.createWithDelay(100); 320 repeatedlyInterruptTestThread(20, tearDownStack); 321 thread.joinSuccessfully(); 322 assertInterrupted(); 323 } 324 testJoinTimeoutMultiInterruptNoExpire()325 public void testJoinTimeoutMultiInterruptNoExpire() { 326 TimedThread thread = TimedThread.createWithDelay(100); 327 repeatedlyInterruptTestThread(20, tearDownStack); 328 thread.joinSuccessfully(LONG_DELAY_MS); 329 assertInterrupted(); 330 } 331 testJoinTimeoutMultiInterruptExpired()332 public void testJoinTimeoutMultiInterruptExpired() { 333 /* 334 * We don't "need" to schedule a thread completion at all here, but by doing 335 * so, we come the closest we can to testing that the wait time is 336 * appropriately decreased on each progressive join() call. 337 */ 338 TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS); 339 repeatedlyInterruptTestThread(20, tearDownStack); 340 thread.joinUnsuccessfully(70); 341 assertInterrupted(); 342 } 343 344 // sleep() Tests testSleepNoInterrupt()345 public void testSleepNoInterrupt() { 346 sleepSuccessfully(10); 347 } 348 testSleepSingleInterrupt()349 public void testSleepSingleInterrupt() { 350 requestInterruptIn(10); 351 sleepSuccessfully(50); 352 assertInterrupted(); 353 } 354 testSleepMultiInterrupt()355 public void testSleepMultiInterrupt() { 356 repeatedlyInterruptTestThread(10, tearDownStack); 357 sleepSuccessfully(100); 358 assertInterrupted(); 359 } 360 361 // Semaphore.tryAcquire() tests testTryAcquireWithNoWait()362 public void testTryAcquireWithNoWait() { 363 Stopwatch stopwatch = Stopwatch.createStarted(); 364 Semaphore semaphore = new Semaphore(99); 365 assertTrue(tryAcquireUninterruptibly(semaphore, 0, MILLISECONDS)); 366 assertTrue(tryAcquireUninterruptibly(semaphore, -42, MILLISECONDS)); 367 assertTrue(tryAcquireUninterruptibly(semaphore, LONG_DELAY_MS, MILLISECONDS)); 368 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 369 } 370 testTryAcquireTimeoutNoInterruptNotExpired()371 public void testTryAcquireTimeoutNoInterruptNotExpired() { 372 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20); 373 semaphore.tryAcquireSuccessfully(LONG_DELAY_MS); 374 assertNotInterrupted(); 375 } 376 testTryAcquireTimeoutNoInterruptExpired()377 public void testTryAcquireTimeoutNoInterruptExpired() { 378 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 379 semaphore.tryAcquireUnsuccessfully(30); 380 assertNotInterrupted(); 381 } 382 testTryAcquireTimeoutSingleInterruptNoExpire()383 public void testTryAcquireTimeoutSingleInterruptNoExpire() { 384 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50); 385 requestInterruptIn(10); 386 semaphore.tryAcquireSuccessfully(LONG_DELAY_MS); 387 assertInterrupted(); 388 } 389 testTryAcquireTimeoutSingleInterruptExpired()390 public void testTryAcquireTimeoutSingleInterruptExpired() { 391 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 392 requestInterruptIn(10); 393 semaphore.tryAcquireUnsuccessfully(50); 394 assertInterrupted(); 395 } 396 testTryAcquireTimeoutMultiInterruptNoExpire()397 public void testTryAcquireTimeoutMultiInterruptNoExpire() { 398 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100); 399 repeatedlyInterruptTestThread(20, tearDownStack); 400 semaphore.tryAcquireSuccessfully(LONG_DELAY_MS); 401 assertInterrupted(); 402 } 403 testTryAcquireTimeoutMultiInterruptExpired()404 public void testTryAcquireTimeoutMultiInterruptExpired() { 405 /* 406 * We don't "need" to schedule a release() call at all here, but by doing 407 * so, we come the closest we can to testing that the wait time is 408 * appropriately decreased on each progressive tryAcquire() call. 409 */ 410 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 411 repeatedlyInterruptTestThread(20, tearDownStack); 412 semaphore.tryAcquireUnsuccessfully(70); 413 assertInterrupted(); 414 } 415 testTryAcquireWithNoWaitMultiPermit()416 public void testTryAcquireWithNoWaitMultiPermit() { 417 Stopwatch stopwatch = Stopwatch.createStarted(); 418 Semaphore semaphore = new Semaphore(99); 419 assertTrue(tryAcquireUninterruptibly(semaphore, 10, 0, MILLISECONDS)); 420 assertTrue(tryAcquireUninterruptibly(semaphore, 10, -42, MILLISECONDS)); 421 assertTrue(tryAcquireUninterruptibly(semaphore, 10, LONG_DELAY_MS, MILLISECONDS)); 422 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 423 } 424 testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit()425 public void testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit() { 426 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20); 427 semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS); 428 assertNotInterrupted(); 429 } 430 testTryAcquireTimeoutNoInterruptExpiredMultiPermit()431 public void testTryAcquireTimeoutNoInterruptExpiredMultiPermit() { 432 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 433 semaphore.tryAcquireUnsuccessfully(10, 30); 434 assertNotInterrupted(); 435 } 436 testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit()437 public void testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit() { 438 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50); 439 requestInterruptIn(10); 440 semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS); 441 assertInterrupted(); 442 } 443 testTryAcquireTimeoutSingleInterruptExpiredMultiPermit()444 public void testTryAcquireTimeoutSingleInterruptExpiredMultiPermit() { 445 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 446 requestInterruptIn(10); 447 semaphore.tryAcquireUnsuccessfully(10, 50); 448 assertInterrupted(); 449 } 450 testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit()451 public void testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit() { 452 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100); 453 repeatedlyInterruptTestThread(20, tearDownStack); 454 semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS); 455 assertInterrupted(); 456 } 457 testTryAcquireTimeoutMultiInterruptExpiredMultiPermit()458 public void testTryAcquireTimeoutMultiInterruptExpiredMultiPermit() { 459 /* 460 * We don't "need" to schedule a release() call at all here, but by doing 461 * so, we come the closest we can to testing that the wait time is 462 * appropriately decreased on each progressive tryAcquire() call. 463 */ 464 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 465 repeatedlyInterruptTestThread(20, tearDownStack); 466 semaphore.tryAcquireUnsuccessfully(10, 70); 467 assertInterrupted(); 468 } 469 470 // executor.awaitTermination Testcases testTryAwaitTerminationUninterruptiblyLongTimeUnit_success()471 public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_success() { 472 ExecutorService executor = newFixedThreadPool(1); 473 requestInterruptIn(500); 474 executor.execute(new SleepTask(1000)); 475 executor.shutdown(); 476 assertTrue(awaitTerminationUninterruptibly(executor, LONG_DELAY_MS, MILLISECONDS)); 477 assertTrue(executor.isTerminated()); 478 assertInterrupted(); 479 } 480 testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure()481 public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure() { 482 ExecutorService executor = newFixedThreadPool(1); 483 requestInterruptIn(500); 484 executor.execute(new SleepTask(10000)); 485 executor.shutdown(); 486 assertFalse(awaitTerminationUninterruptibly(executor, 1000, MILLISECONDS)); 487 assertFalse(executor.isTerminated()); 488 assertInterrupted(); 489 } 490 testTryAwaitTerminationInfiniteTimeout()491 public void testTryAwaitTerminationInfiniteTimeout() { 492 ExecutorService executor = newFixedThreadPool(1); 493 requestInterruptIn(500); 494 executor.execute(new SleepTask(1000)); 495 executor.shutdown(); 496 awaitTerminationUninterruptibly(executor); 497 assertTrue(executor.isTerminated()); 498 assertInterrupted(); 499 } 500 501 /** 502 * Wrapper around {@link Stopwatch} which also contains an "expected completion time." Creating a 503 * {@code Completion} starts the underlying stopwatch. 504 */ 505 private static final class Completion { 506 final Stopwatch stopwatch; 507 final long expectedCompletionWaitMillis; 508 Completion(long expectedCompletionWaitMillis)509 Completion(long expectedCompletionWaitMillis) { 510 this.expectedCompletionWaitMillis = expectedCompletionWaitMillis; 511 stopwatch = Stopwatch.createStarted(); 512 } 513 514 /** 515 * Asserts that the expected completion time has passed (and not "too much" time beyond that). 516 */ assertCompletionExpected()517 void assertCompletionExpected() { 518 assertAtLeastTimePassed(stopwatch, expectedCompletionWaitMillis); 519 assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis + LONG_DELAY_MS); 520 } 521 522 /** 523 * Asserts that at least {@code timeout} has passed but the expected completion time has not. 524 */ assertCompletionNotExpected(long timeout)525 void assertCompletionNotExpected(long timeout) { 526 Preconditions.checkArgument(timeout < expectedCompletionWaitMillis); 527 assertAtLeastTimePassed(stopwatch, timeout); 528 assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis); 529 } 530 } 531 532 private static void assertAtLeastTimePassed(Stopwatch stopwatch, long expectedMillis) { 533 long elapsedMillis = stopwatch.elapsed(MILLISECONDS); 534 /* 535 * The "+ 5" below is to permit, say, sleep(10) to sleep only 9 milliseconds. We see such 536 * behavior sometimes when running these tests publicly as part of Guava. "+ 5" is probably more 537 * generous than it needs to be. 538 */ 539 assertTrue( 540 "Expected elapsed millis to be >= " + expectedMillis + " but was " + elapsedMillis, 541 elapsedMillis + 5 >= expectedMillis); 542 } 543 544 // TODO(cpovirk): Split this into separate CountDownLatch and IncrementableCountDownLatch classes. 545 546 /** Manages a {@link BlockingQueue} and associated timings for a {@code put} call. */ 547 private static final class TimedPutQueue { 548 final BlockingQueue<String> queue; 549 final Completion completed; 550 551 /** 552 * Creates a {@link EnableWrites} which open up a spot for a {@code put} to succeed in {@code 553 * countdownInMillis}. 554 */ createWithDelay(long countdownInMillis)555 static TimedPutQueue createWithDelay(long countdownInMillis) { 556 return new TimedPutQueue(countdownInMillis); 557 } 558 TimedPutQueue(long countdownInMillis)559 private TimedPutQueue(long countdownInMillis) { 560 this.queue = new ArrayBlockingQueue<>(1); 561 assertTrue(queue.offer("blocksPutCallsUntilRemoved")); 562 this.completed = new Completion(countdownInMillis); 563 scheduleEnableWrites(this.queue, countdownInMillis); 564 } 565 566 /** Perform a {@code put} and assert that operation completed in the expected timeframe. */ putSuccessfully()567 void putSuccessfully() { 568 putUninterruptibly(queue, ""); 569 completed.assertCompletionExpected(); 570 assertEquals("", queue.peek()); 571 } 572 scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis)573 private static void scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis) { 574 Runnable toRun = new EnableWrites(queue, countdownInMillis); 575 // TODO(cpovirk): automatically fail the test if this thread throws 576 Thread enablerThread = new Thread(toRun); 577 enablerThread.start(); 578 } 579 } 580 581 /** Manages a {@link BlockingQueue} and associated timings for a {@code take} call. */ 582 private static final class TimedTakeQueue { 583 final BlockingQueue<String> queue; 584 final Completion completed; 585 586 /** 587 * Creates a {@link EnableReads} which insert an element for a {@code take} to receive in {@code 588 * countdownInMillis}. 589 */ createWithDelay(long countdownInMillis)590 static TimedTakeQueue createWithDelay(long countdownInMillis) { 591 return new TimedTakeQueue(countdownInMillis); 592 } 593 TimedTakeQueue(long countdownInMillis)594 private TimedTakeQueue(long countdownInMillis) { 595 this.queue = new ArrayBlockingQueue<>(1); 596 this.completed = new Completion(countdownInMillis); 597 scheduleEnableReads(this.queue, countdownInMillis); 598 } 599 600 /** Perform a {@code take} and assert that operation completed in the expected timeframe. */ takeSuccessfully()601 void takeSuccessfully() { 602 assertEquals(EXPECTED_TAKE, takeUninterruptibly(queue)); 603 completed.assertCompletionExpected(); 604 assertTrue(queue.isEmpty()); 605 } 606 scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis)607 private static void scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis) { 608 Runnable toRun = new EnableReads(queue, countdownInMillis); 609 // TODO(cpovirk): automatically fail the test if this thread throws 610 Thread enablerThread = new Thread(toRun); 611 enablerThread.start(); 612 } 613 } 614 615 /** Manages a {@link Semaphore} and associated timings. */ 616 private static final class TimedSemaphore { 617 final Semaphore semaphore; 618 final Completion completed; 619 620 /** 621 * Create a {@link Release} which will release a semaphore permit in {@code countdownInMillis}. 622 */ createWithDelay(long countdownInMillis)623 static TimedSemaphore createWithDelay(long countdownInMillis) { 624 return new TimedSemaphore(countdownInMillis); 625 } 626 TimedSemaphore(long countdownInMillis)627 private TimedSemaphore(long countdownInMillis) { 628 this.semaphore = new Semaphore(0); 629 this.completed = new Completion(countdownInMillis); 630 scheduleRelease(countdownInMillis); 631 } 632 633 /** 634 * Requests a permit from the semaphore with a timeout and asserts that operation completed in 635 * the expected timeframe. 636 */ tryAcquireSuccessfully(long timeoutMillis)637 void tryAcquireSuccessfully(long timeoutMillis) { 638 assertTrue(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS)); 639 completed.assertCompletionExpected(); 640 } 641 tryAcquireSuccessfully(int permits, long timeoutMillis)642 void tryAcquireSuccessfully(int permits, long timeoutMillis) { 643 assertTrue(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS)); 644 completed.assertCompletionExpected(); 645 } 646 647 /** 648 * Requests a permit from the semaphore with a timeout and asserts that the wait returned within 649 * the expected timeout. 650 */ tryAcquireUnsuccessfully(long timeoutMillis)651 private void tryAcquireUnsuccessfully(long timeoutMillis) { 652 assertFalse(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS)); 653 completed.assertCompletionNotExpected(timeoutMillis); 654 } 655 tryAcquireUnsuccessfully(int permits, long timeoutMillis)656 private void tryAcquireUnsuccessfully(int permits, long timeoutMillis) { 657 assertFalse(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS)); 658 completed.assertCompletionNotExpected(timeoutMillis); 659 } 660 scheduleRelease(long countdownInMillis)661 private void scheduleRelease(long countdownInMillis) { 662 DelayedActionRunnable toRun = new Release(semaphore, countdownInMillis); 663 // TODO(cpovirk): automatically fail the test if this thread throws 664 Thread releaserThread = new Thread(toRun); 665 releaserThread.start(); 666 } 667 } 668 669 private abstract static class DelayedActionRunnable implements Runnable { 670 private final long tMinus; 671 DelayedActionRunnable(long tMinus)672 protected DelayedActionRunnable(long tMinus) { 673 this.tMinus = tMinus; 674 } 675 676 @Override run()677 public final void run() { 678 try { 679 Thread.sleep(tMinus); 680 } catch (InterruptedException e) { 681 throw new AssertionError(e); 682 } 683 doAction(); 684 } 685 doAction()686 protected abstract void doAction(); 687 } 688 689 private static class CountDown extends DelayedActionRunnable { 690 private final CountDownLatch latch; 691 CountDown(CountDownLatch latch, long tMinus)692 public CountDown(CountDownLatch latch, long tMinus) { 693 super(tMinus); 694 this.latch = latch; 695 } 696 697 @Override doAction()698 protected void doAction() { 699 latch.countDown(); 700 } 701 } 702 703 private static class EnableWrites extends DelayedActionRunnable { 704 private final BlockingQueue<String> queue; 705 EnableWrites(BlockingQueue<String> queue, long tMinus)706 public EnableWrites(BlockingQueue<String> queue, long tMinus) { 707 super(tMinus); 708 assertFalse(queue.isEmpty()); 709 assertFalse(queue.offer("shouldBeRejected")); 710 this.queue = queue; 711 } 712 713 @Override doAction()714 protected void doAction() { 715 assertNotNull(queue.remove()); 716 } 717 } 718 719 private static class EnableReads extends DelayedActionRunnable { 720 private final BlockingQueue<String> queue; 721 EnableReads(BlockingQueue<String> queue, long tMinus)722 public EnableReads(BlockingQueue<String> queue, long tMinus) { 723 super(tMinus); 724 assertTrue(queue.isEmpty()); 725 this.queue = queue; 726 } 727 728 @Override doAction()729 protected void doAction() { 730 assertTrue(queue.offer(EXPECTED_TAKE)); 731 } 732 } 733 734 private static final class TimedThread { 735 private final Thread thread; 736 private final Completion completed; 737 createWithDelay(long countdownInMillis)738 static TimedThread createWithDelay(long countdownInMillis) { 739 return new TimedThread(countdownInMillis); 740 } 741 TimedThread(long expectedCompletionWaitMillis)742 private TimedThread(long expectedCompletionWaitMillis) { 743 completed = new Completion(expectedCompletionWaitMillis); 744 thread = new Thread(new JoinTarget(expectedCompletionWaitMillis)); 745 thread.start(); 746 } 747 joinSuccessfully()748 void joinSuccessfully() { 749 Uninterruptibles.joinUninterruptibly(thread); 750 completed.assertCompletionExpected(); 751 assertEquals(Thread.State.TERMINATED, thread.getState()); 752 } 753 joinSuccessfully(long timeoutMillis)754 void joinSuccessfully(long timeoutMillis) { 755 Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS); 756 completed.assertCompletionExpected(); 757 assertEquals(Thread.State.TERMINATED, thread.getState()); 758 } 759 joinUnsuccessfully(long timeoutMillis)760 void joinUnsuccessfully(long timeoutMillis) { 761 Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS); 762 completed.assertCompletionNotExpected(timeoutMillis); 763 assertFalse(Thread.State.TERMINATED.equals(thread.getState())); 764 } 765 } 766 767 private static class JoinTarget extends DelayedActionRunnable { JoinTarget(long tMinus)768 public JoinTarget(long tMinus) { 769 super(tMinus); 770 } 771 772 @Override doAction()773 protected void doAction() {} 774 } 775 776 private static class Release extends DelayedActionRunnable { 777 private final Semaphore semaphore; 778 Release(Semaphore semaphore, long tMinus)779 public Release(Semaphore semaphore, long tMinus) { 780 super(tMinus); 781 this.semaphore = semaphore; 782 } 783 784 @Override doAction()785 protected void doAction() { 786 semaphore.release(10); 787 } 788 } 789 790 private static final class SleepTask extends DelayedActionRunnable { SleepTask(long tMinus)791 SleepTask(long tMinus) { 792 super(tMinus); 793 } 794 795 @Override doAction()796 protected void doAction() {} 797 } 798 sleepSuccessfully(long sleepMillis)799 private static void sleepSuccessfully(long sleepMillis) { 800 Completion completed = new Completion(sleepMillis - SLEEP_SLACK); 801 Uninterruptibles.sleepUninterruptibly(sleepMillis, MILLISECONDS); 802 completed.assertCompletionExpected(); 803 } 804 assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis)805 private static void assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis) { 806 long elapsedMillis = stopwatch.elapsed(MILLISECONDS); 807 assertTrue(elapsedMillis < timelimitMillis); 808 } 809 810 /** 811 * Await an interrupt, then clear the interrupt status. Similar to {@code 812 * assertTrue(Thread.interrupted())} except that this version tolerates late interrupts. 813 */ 814 private static void assertInterrupted() { 815 try { 816 /* 817 * The sleep() will end immediately if we've already been interrupted or 818 * wait patiently for the interrupt if not. 819 */ 820 Thread.sleep(LONG_DELAY_MS); 821 fail("Dude, where's my interrupt?"); 822 } catch (InterruptedException expected) { 823 } 824 } 825 826 private static void assertNotInterrupted() { 827 assertFalse(Thread.interrupted()); 828 } 829 830 private static void requestInterruptIn(long millis) { 831 InterruptionUtil.requestInterruptIn(millis, MILLISECONDS); 832 } 833 834 @CanIgnoreReturnValue 835 private static Thread acquireFor(final Lock lock, final long duration, final TimeUnit unit) { 836 final CountDownLatch latch = new CountDownLatch(1); 837 Thread thread = 838 new Thread() { 839 @Override 840 public void run() { 841 lock.lock(); 842 latch.countDown(); 843 try { 844 Thread.sleep(unit.toMillis(duration)); 845 } catch (InterruptedException e) { 846 // simply finish execution 847 } finally { 848 lock.unlock(); 849 } 850 } 851 }; 852 thread.setDaemon(true); 853 thread.start(); 854 awaitUninterruptibly(latch); 855 return thread; 856 } 857 858 private static class TestCondition implements Condition { 859 private final Lock lock; 860 private final Condition condition; 861 862 private TestCondition(Lock lock, Condition condition) { 863 this.lock = lock; 864 this.condition = condition; 865 } 866 867 static TestCondition createAndSignalAfter(long delay, TimeUnit unit) { 868 final TestCondition testCondition = create(); 869 870 ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1); 871 // If signal() fails somehow, we should see a failed test, even without looking at the Future. 872 Future<?> unused = 873 scheduledPool.schedule( 874 new Runnable() { 875 @Override 876 public void run() { 877 testCondition.signal(); 878 } 879 }, 880 delay, 881 unit); 882 883 return testCondition; 884 } 885 886 static TestCondition create() { 887 Lock lock = new ReentrantLock(); 888 Condition condition = lock.newCondition(); 889 return new TestCondition(lock, condition); 890 } 891 892 @Override 893 public void await() throws InterruptedException { 894 lock.lock(); 895 try { 896 condition.await(); 897 } finally { 898 lock.unlock(); 899 } 900 } 901 902 @Override 903 public boolean await(long time, TimeUnit unit) throws InterruptedException { 904 lock.lock(); 905 try { 906 return condition.await(time, unit); 907 } finally { 908 lock.unlock(); 909 } 910 } 911 912 @Override 913 public void awaitUninterruptibly() { 914 lock.lock(); 915 try { 916 condition.awaitUninterruptibly(); 917 } finally { 918 lock.unlock(); 919 } 920 } 921 922 @Override 923 public long awaitNanos(long nanosTimeout) throws InterruptedException { 924 lock.lock(); 925 try { 926 return condition.awaitNanos(nanosTimeout); 927 } finally { 928 lock.unlock(); 929 } 930 } 931 932 @Override 933 public boolean awaitUntil(Date deadline) throws InterruptedException { 934 lock.lock(); 935 try { 936 return condition.awaitUntil(deadline); 937 } finally { 938 lock.unlock(); 939 } 940 } 941 942 @Override 943 public void signal() { 944 lock.lock(); 945 try { 946 condition.signal(); 947 } finally { 948 lock.unlock(); 949 } 950 } 951 952 @Override 953 public void signalAll() { 954 lock.lock(); 955 try { 956 condition.signalAll(); 957 } finally { 958 lock.unlock(); 959 } 960 } 961 } 962 } 963