• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5  * use this file except in compliance with the License. You may obtain a copy of
6  * the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations under
14  * the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.util.concurrent.InterruptionUtil.repeatedlyInterruptTestThread;
20 import static com.google.common.util.concurrent.Uninterruptibles.awaitTerminationUninterruptibly;
21 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
22 import static com.google.common.util.concurrent.Uninterruptibles.joinUninterruptibly;
23 import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
24 import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
25 import static com.google.common.util.concurrent.Uninterruptibles.tryAcquireUninterruptibly;
26 import static com.google.common.util.concurrent.Uninterruptibles.tryLockUninterruptibly;
27 import static java.util.concurrent.Executors.newFixedThreadPool;
28 import static java.util.concurrent.TimeUnit.MILLISECONDS;
29 import static java.util.concurrent.TimeUnit.SECONDS;
30 
31 import com.google.common.base.Preconditions;
32 import com.google.common.base.Stopwatch;
33 import com.google.common.testing.NullPointerTester;
34 import com.google.common.testing.TearDown;
35 import com.google.common.testing.TearDownStack;
36 import com.google.errorprone.annotations.CanIgnoreReturnValue;
37 import java.util.Date;
38 import java.util.concurrent.ArrayBlockingQueue;
39 import java.util.concurrent.BlockingQueue;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.Future;
44 import java.util.concurrent.ScheduledExecutorService;
45 import java.util.concurrent.Semaphore;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.locks.Condition;
48 import java.util.concurrent.locks.Lock;
49 import java.util.concurrent.locks.ReentrantLock;
50 import junit.framework.TestCase;
51 
52 /**
53  * Tests for {@link Uninterruptibles}.
54  *
55  * @author Anthony Zana
56  */
57 public class UninterruptiblesTest extends TestCase {
58   private static final String EXPECTED_TAKE = "expectedTake";
59 
60   /** Timeout to use when we don't expect the timeout to expire. */
61   private static final long LONG_DELAY_MS = 2500;
62 
63   private static final long SLEEP_SLACK = 2;
64 
65   private final TearDownStack tearDownStack = new TearDownStack();
66 
67   // NOTE: All durations in these tests are expressed in milliseconds
68   @Override
setUp()69   protected void setUp() {
70     // Clear any previous interrupt before running the test.
71     if (Thread.currentThread().isInterrupted()) {
72       throw new AssertionError(
73           "Thread interrupted on test entry. "
74               + "Some test probably didn't clear the interrupt state");
75     }
76 
77     tearDownStack.addTearDown(
78         new TearDown() {
79           @Override
80           public void tearDown() {
81             Thread.interrupted();
82           }
83         });
84   }
85 
86   @Override
tearDown()87   protected void tearDown() {
88     tearDownStack.runTearDown();
89   }
90 
testNull()91   public void testNull() throws Exception {
92     new NullPointerTester()
93         .setDefault(CountDownLatch.class, new CountDownLatch(0))
94         .setDefault(Semaphore.class, new Semaphore(999))
95         .testAllPublicStaticMethods(Uninterruptibles.class);
96   }
97 
98   // IncrementableCountDownLatch.await() tests
99 
100   // CountDownLatch.await() tests
101 
102   // Condition.await() tests
testConditionAwaitTimeoutExceeded()103   public void testConditionAwaitTimeoutExceeded() {
104     Stopwatch stopwatch = Stopwatch.createStarted();
105     Condition condition = TestCondition.create();
106 
107     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 500, MILLISECONDS);
108 
109     assertFalse(signaledBeforeTimeout);
110     assertAtLeastTimePassed(stopwatch, 500);
111     assertNotInterrupted();
112   }
113 
testConditionAwaitTimeoutNotExceeded()114   public void testConditionAwaitTimeoutNotExceeded() {
115     Stopwatch stopwatch = Stopwatch.createStarted();
116     Condition condition = TestCondition.createAndSignalAfter(500, MILLISECONDS);
117 
118     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS);
119 
120     assertTrue(signaledBeforeTimeout);
121     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
122     assertNotInterrupted();
123   }
124 
testConditionAwaitInterruptedTimeoutExceeded()125   public void testConditionAwaitInterruptedTimeoutExceeded() {
126     Stopwatch stopwatch = Stopwatch.createStarted();
127     Condition condition = TestCondition.create();
128     requestInterruptIn(500);
129 
130     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1000, MILLISECONDS);
131 
132     assertFalse(signaledBeforeTimeout);
133     assertAtLeastTimePassed(stopwatch, 1000);
134     assertInterrupted();
135   }
136 
testConditionAwaitInterruptedTimeoutNotExceeded()137   public void testConditionAwaitInterruptedTimeoutNotExceeded() {
138     Stopwatch stopwatch = Stopwatch.createStarted();
139     Condition condition = TestCondition.createAndSignalAfter(1000, MILLISECONDS);
140     requestInterruptIn(500);
141 
142     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS);
143 
144     assertTrue(signaledBeforeTimeout);
145     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
146     assertInterrupted();
147   }
148 
149   // Lock.tryLock() tests
testTryLockTimeoutExceeded()150   public void testTryLockTimeoutExceeded() {
151     Stopwatch stopwatch = Stopwatch.createStarted();
152     Lock lock = new ReentrantLock();
153     Thread lockThread = acquireFor(lock, 5, SECONDS);
154 
155     boolean lockAcquired = tryLockUninterruptibly(lock, 500, MILLISECONDS);
156 
157     assertFalse(lockAcquired);
158     assertAtLeastTimePassed(stopwatch, 500);
159     assertNotInterrupted();
160 
161     // finish locking thread
162     lockThread.interrupt();
163   }
164 
testTryLockTimeoutNotExceeded()165   public void testTryLockTimeoutNotExceeded() {
166     Stopwatch stopwatch = Stopwatch.createStarted();
167     Lock lock = new ReentrantLock();
168     acquireFor(lock, 500, MILLISECONDS);
169 
170     boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS);
171 
172     assertTrue(signaledBeforeTimeout);
173     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
174     assertNotInterrupted();
175   }
176 
testTryLockInterruptedTimeoutExceeded()177   public void testTryLockInterruptedTimeoutExceeded() {
178     Stopwatch stopwatch = Stopwatch.createStarted();
179     Lock lock = new ReentrantLock();
180     Thread lockThread = acquireFor(lock, 5, SECONDS);
181     requestInterruptIn(500);
182 
183     boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1000, MILLISECONDS);
184 
185     assertFalse(signaledBeforeTimeout);
186     assertAtLeastTimePassed(stopwatch, 1000);
187     assertInterrupted();
188 
189     // finish locking thread
190     lockThread.interrupt();
191   }
192 
testTryLockInterruptedTimeoutNotExceeded()193   public void testTryLockInterruptedTimeoutNotExceeded() {
194     Stopwatch stopwatch = Stopwatch.createStarted();
195     Lock lock = new ReentrantLock();
196     acquireFor(lock, 1000, MILLISECONDS);
197     requestInterruptIn(500);
198 
199     boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS);
200 
201     assertTrue(signaledBeforeTimeout);
202     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
203     assertInterrupted();
204   }
205 
206   // BlockingQueue.put() tests
testPutWithNoWait()207   public void testPutWithNoWait() {
208     Stopwatch stopwatch = Stopwatch.createStarted();
209     BlockingQueue<String> queue = new ArrayBlockingQueue<>(999);
210     putUninterruptibly(queue, "");
211     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
212     assertEquals("", queue.peek());
213   }
214 
testPutNoInterrupt()215   public void testPutNoInterrupt() {
216     TimedPutQueue queue = TimedPutQueue.createWithDelay(20);
217     queue.putSuccessfully();
218     assertNotInterrupted();
219   }
220 
testPutSingleInterrupt()221   public void testPutSingleInterrupt() {
222     TimedPutQueue queue = TimedPutQueue.createWithDelay(50);
223     requestInterruptIn(10);
224     queue.putSuccessfully();
225     assertInterrupted();
226   }
227 
testPutMultiInterrupt()228   public void testPutMultiInterrupt() {
229     TimedPutQueue queue = TimedPutQueue.createWithDelay(100);
230     repeatedlyInterruptTestThread(20, tearDownStack);
231     queue.putSuccessfully();
232     assertInterrupted();
233   }
234 
235   // BlockingQueue.take() tests
testTakeWithNoWait()236   public void testTakeWithNoWait() {
237     Stopwatch stopwatch = Stopwatch.createStarted();
238     BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
239     assertTrue(queue.offer(""));
240     assertEquals("", takeUninterruptibly(queue));
241     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
242   }
243 
testTakeNoInterrupt()244   public void testTakeNoInterrupt() {
245     TimedTakeQueue queue = TimedTakeQueue.createWithDelay(20);
246     queue.takeSuccessfully();
247     assertNotInterrupted();
248   }
249 
testTakeSingleInterrupt()250   public void testTakeSingleInterrupt() {
251     TimedTakeQueue queue = TimedTakeQueue.createWithDelay(50);
252     requestInterruptIn(10);
253     queue.takeSuccessfully();
254     assertInterrupted();
255   }
256 
testTakeMultiInterrupt()257   public void testTakeMultiInterrupt() {
258     TimedTakeQueue queue = TimedTakeQueue.createWithDelay(100);
259     repeatedlyInterruptTestThread(20, tearDownStack);
260     queue.takeSuccessfully();
261     assertInterrupted();
262   }
263 
264   // join() tests
testJoinWithNoWait()265   public void testJoinWithNoWait() throws InterruptedException {
266     Stopwatch stopwatch = Stopwatch.createStarted();
267     Thread thread = new Thread(new JoinTarget(15));
268     thread.start();
269     thread.join();
270     assertFalse(thread.isAlive());
271 
272     joinUninterruptibly(thread);
273     joinUninterruptibly(thread, 0, MILLISECONDS);
274     joinUninterruptibly(thread, -42, MILLISECONDS);
275     joinUninterruptibly(thread, LONG_DELAY_MS, MILLISECONDS);
276     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
277   }
278 
testJoinNoInterrupt()279   public void testJoinNoInterrupt() {
280     TimedThread thread = TimedThread.createWithDelay(20);
281     thread.joinSuccessfully();
282     assertNotInterrupted();
283   }
284 
testJoinTimeoutNoInterruptNotExpired()285   public void testJoinTimeoutNoInterruptNotExpired() {
286     TimedThread thread = TimedThread.createWithDelay(20);
287     thread.joinSuccessfully(LONG_DELAY_MS);
288     assertNotInterrupted();
289   }
290 
testJoinTimeoutNoInterruptExpired()291   public void testJoinTimeoutNoInterruptExpired() {
292     TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
293     thread.joinUnsuccessfully(30);
294     assertNotInterrupted();
295   }
296 
testJoinSingleInterrupt()297   public void testJoinSingleInterrupt() {
298     TimedThread thread = TimedThread.createWithDelay(50);
299     requestInterruptIn(10);
300     thread.joinSuccessfully();
301     assertInterrupted();
302   }
303 
testJoinTimeoutSingleInterruptNoExpire()304   public void testJoinTimeoutSingleInterruptNoExpire() {
305     TimedThread thread = TimedThread.createWithDelay(50);
306     requestInterruptIn(10);
307     thread.joinSuccessfully(LONG_DELAY_MS);
308     assertInterrupted();
309   }
310 
testJoinTimeoutSingleInterruptExpired()311   public void testJoinTimeoutSingleInterruptExpired() {
312     TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
313     requestInterruptIn(10);
314     thread.joinUnsuccessfully(50);
315     assertInterrupted();
316   }
317 
testJoinMultiInterrupt()318   public void testJoinMultiInterrupt() {
319     TimedThread thread = TimedThread.createWithDelay(100);
320     repeatedlyInterruptTestThread(20, tearDownStack);
321     thread.joinSuccessfully();
322     assertInterrupted();
323   }
324 
testJoinTimeoutMultiInterruptNoExpire()325   public void testJoinTimeoutMultiInterruptNoExpire() {
326     TimedThread thread = TimedThread.createWithDelay(100);
327     repeatedlyInterruptTestThread(20, tearDownStack);
328     thread.joinSuccessfully(LONG_DELAY_MS);
329     assertInterrupted();
330   }
331 
testJoinTimeoutMultiInterruptExpired()332   public void testJoinTimeoutMultiInterruptExpired() {
333     /*
334      * We don't "need" to schedule a thread completion at all here, but by doing
335      * so, we come the closest we can to testing that the wait time is
336      * appropriately decreased on each progressive join() call.
337      */
338     TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
339     repeatedlyInterruptTestThread(20, tearDownStack);
340     thread.joinUnsuccessfully(70);
341     assertInterrupted();
342   }
343 
344   // sleep() Tests
testSleepNoInterrupt()345   public void testSleepNoInterrupt() {
346     sleepSuccessfully(10);
347   }
348 
testSleepSingleInterrupt()349   public void testSleepSingleInterrupt() {
350     requestInterruptIn(10);
351     sleepSuccessfully(50);
352     assertInterrupted();
353   }
354 
testSleepMultiInterrupt()355   public void testSleepMultiInterrupt() {
356     repeatedlyInterruptTestThread(10, tearDownStack);
357     sleepSuccessfully(100);
358     assertInterrupted();
359   }
360 
361   // Semaphore.tryAcquire() tests
testTryAcquireWithNoWait()362   public void testTryAcquireWithNoWait() {
363     Stopwatch stopwatch = Stopwatch.createStarted();
364     Semaphore semaphore = new Semaphore(99);
365     assertTrue(tryAcquireUninterruptibly(semaphore, 0, MILLISECONDS));
366     assertTrue(tryAcquireUninterruptibly(semaphore, -42, MILLISECONDS));
367     assertTrue(tryAcquireUninterruptibly(semaphore, LONG_DELAY_MS, MILLISECONDS));
368     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
369   }
370 
testTryAcquireTimeoutNoInterruptNotExpired()371   public void testTryAcquireTimeoutNoInterruptNotExpired() {
372     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20);
373     semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
374     assertNotInterrupted();
375   }
376 
testTryAcquireTimeoutNoInterruptExpired()377   public void testTryAcquireTimeoutNoInterruptExpired() {
378     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
379     semaphore.tryAcquireUnsuccessfully(30);
380     assertNotInterrupted();
381   }
382 
testTryAcquireTimeoutSingleInterruptNoExpire()383   public void testTryAcquireTimeoutSingleInterruptNoExpire() {
384     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50);
385     requestInterruptIn(10);
386     semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
387     assertInterrupted();
388   }
389 
testTryAcquireTimeoutSingleInterruptExpired()390   public void testTryAcquireTimeoutSingleInterruptExpired() {
391     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
392     requestInterruptIn(10);
393     semaphore.tryAcquireUnsuccessfully(50);
394     assertInterrupted();
395   }
396 
testTryAcquireTimeoutMultiInterruptNoExpire()397   public void testTryAcquireTimeoutMultiInterruptNoExpire() {
398     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100);
399     repeatedlyInterruptTestThread(20, tearDownStack);
400     semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
401     assertInterrupted();
402   }
403 
testTryAcquireTimeoutMultiInterruptExpired()404   public void testTryAcquireTimeoutMultiInterruptExpired() {
405     /*
406      * We don't "need" to schedule a release() call at all here, but by doing
407      * so, we come the closest we can to testing that the wait time is
408      * appropriately decreased on each progressive tryAcquire() call.
409      */
410     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
411     repeatedlyInterruptTestThread(20, tearDownStack);
412     semaphore.tryAcquireUnsuccessfully(70);
413     assertInterrupted();
414   }
415 
testTryAcquireWithNoWaitMultiPermit()416   public void testTryAcquireWithNoWaitMultiPermit() {
417     Stopwatch stopwatch = Stopwatch.createStarted();
418     Semaphore semaphore = new Semaphore(99);
419     assertTrue(tryAcquireUninterruptibly(semaphore, 10, 0, MILLISECONDS));
420     assertTrue(tryAcquireUninterruptibly(semaphore, 10, -42, MILLISECONDS));
421     assertTrue(tryAcquireUninterruptibly(semaphore, 10, LONG_DELAY_MS, MILLISECONDS));
422     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
423   }
424 
testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit()425   public void testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit() {
426     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20);
427     semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
428     assertNotInterrupted();
429   }
430 
testTryAcquireTimeoutNoInterruptExpiredMultiPermit()431   public void testTryAcquireTimeoutNoInterruptExpiredMultiPermit() {
432     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
433     semaphore.tryAcquireUnsuccessfully(10, 30);
434     assertNotInterrupted();
435   }
436 
testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit()437   public void testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit() {
438     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50);
439     requestInterruptIn(10);
440     semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
441     assertInterrupted();
442   }
443 
testTryAcquireTimeoutSingleInterruptExpiredMultiPermit()444   public void testTryAcquireTimeoutSingleInterruptExpiredMultiPermit() {
445     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
446     requestInterruptIn(10);
447     semaphore.tryAcquireUnsuccessfully(10, 50);
448     assertInterrupted();
449   }
450 
testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit()451   public void testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit() {
452     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100);
453     repeatedlyInterruptTestThread(20, tearDownStack);
454     semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
455     assertInterrupted();
456   }
457 
testTryAcquireTimeoutMultiInterruptExpiredMultiPermit()458   public void testTryAcquireTimeoutMultiInterruptExpiredMultiPermit() {
459     /*
460      * We don't "need" to schedule a release() call at all here, but by doing
461      * so, we come the closest we can to testing that the wait time is
462      * appropriately decreased on each progressive tryAcquire() call.
463      */
464     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
465     repeatedlyInterruptTestThread(20, tearDownStack);
466     semaphore.tryAcquireUnsuccessfully(10, 70);
467     assertInterrupted();
468   }
469 
470   // executor.awaitTermination Testcases
testTryAwaitTerminationUninterruptiblyLongTimeUnit_success()471   public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_success() {
472     ExecutorService executor = newFixedThreadPool(1);
473     requestInterruptIn(500);
474     executor.execute(new SleepTask(1000));
475     executor.shutdown();
476     assertTrue(awaitTerminationUninterruptibly(executor, LONG_DELAY_MS, MILLISECONDS));
477     assertTrue(executor.isTerminated());
478     assertInterrupted();
479   }
480 
testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure()481   public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure() {
482     ExecutorService executor = newFixedThreadPool(1);
483     requestInterruptIn(500);
484     executor.execute(new SleepTask(10000));
485     executor.shutdown();
486     assertFalse(awaitTerminationUninterruptibly(executor, 1000, MILLISECONDS));
487     assertFalse(executor.isTerminated());
488     assertInterrupted();
489   }
490 
testTryAwaitTerminationInfiniteTimeout()491   public void testTryAwaitTerminationInfiniteTimeout() {
492     ExecutorService executor = newFixedThreadPool(1);
493     requestInterruptIn(500);
494     executor.execute(new SleepTask(1000));
495     executor.shutdown();
496     awaitTerminationUninterruptibly(executor);
497     assertTrue(executor.isTerminated());
498     assertInterrupted();
499   }
500 
501   /**
502    * Wrapper around {@link Stopwatch} which also contains an "expected completion time." Creating a
503    * {@code Completion} starts the underlying stopwatch.
504    */
505   private static final class Completion {
506     final Stopwatch stopwatch;
507     final long expectedCompletionWaitMillis;
508 
Completion(long expectedCompletionWaitMillis)509     Completion(long expectedCompletionWaitMillis) {
510       this.expectedCompletionWaitMillis = expectedCompletionWaitMillis;
511       stopwatch = Stopwatch.createStarted();
512     }
513 
514     /**
515      * Asserts that the expected completion time has passed (and not "too much" time beyond that).
516      */
assertCompletionExpected()517     void assertCompletionExpected() {
518       assertAtLeastTimePassed(stopwatch, expectedCompletionWaitMillis);
519       assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis + LONG_DELAY_MS);
520     }
521 
522     /**
523      * Asserts that at least {@code timeout} has passed but the expected completion time has not.
524      */
assertCompletionNotExpected(long timeout)525     void assertCompletionNotExpected(long timeout) {
526       Preconditions.checkArgument(timeout < expectedCompletionWaitMillis);
527       assertAtLeastTimePassed(stopwatch, timeout);
528       assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis);
529     }
530   }
531 
532   private static void assertAtLeastTimePassed(Stopwatch stopwatch, long expectedMillis) {
533     long elapsedMillis = stopwatch.elapsed(MILLISECONDS);
534     /*
535      * The "+ 5" below is to permit, say, sleep(10) to sleep only 9 milliseconds. We see such
536      * behavior sometimes when running these tests publicly as part of Guava. "+ 5" is probably more
537      * generous than it needs to be.
538      */
539     assertTrue(
540         "Expected elapsed millis to be >= " + expectedMillis + " but was " + elapsedMillis,
541         elapsedMillis + 5 >= expectedMillis);
542   }
543 
544   // TODO(cpovirk): Split this into separate CountDownLatch and IncrementableCountDownLatch classes.
545 
546   /** Manages a {@link BlockingQueue} and associated timings for a {@code put} call. */
547   private static final class TimedPutQueue {
548     final BlockingQueue<String> queue;
549     final Completion completed;
550 
551     /**
552      * Creates a {@link EnableWrites} which open up a spot for a {@code put} to succeed in {@code
553      * countdownInMillis}.
554      */
createWithDelay(long countdownInMillis)555     static TimedPutQueue createWithDelay(long countdownInMillis) {
556       return new TimedPutQueue(countdownInMillis);
557     }
558 
TimedPutQueue(long countdownInMillis)559     private TimedPutQueue(long countdownInMillis) {
560       this.queue = new ArrayBlockingQueue<>(1);
561       assertTrue(queue.offer("blocksPutCallsUntilRemoved"));
562       this.completed = new Completion(countdownInMillis);
563       scheduleEnableWrites(this.queue, countdownInMillis);
564     }
565 
566     /** Perform a {@code put} and assert that operation completed in the expected timeframe. */
putSuccessfully()567     void putSuccessfully() {
568       putUninterruptibly(queue, "");
569       completed.assertCompletionExpected();
570       assertEquals("", queue.peek());
571     }
572 
scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis)573     private static void scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis) {
574       Runnable toRun = new EnableWrites(queue, countdownInMillis);
575       // TODO(cpovirk): automatically fail the test if this thread throws
576       Thread enablerThread = new Thread(toRun);
577       enablerThread.start();
578     }
579   }
580 
581   /** Manages a {@link BlockingQueue} and associated timings for a {@code take} call. */
582   private static final class TimedTakeQueue {
583     final BlockingQueue<String> queue;
584     final Completion completed;
585 
586     /**
587      * Creates a {@link EnableReads} which insert an element for a {@code take} to receive in {@code
588      * countdownInMillis}.
589      */
createWithDelay(long countdownInMillis)590     static TimedTakeQueue createWithDelay(long countdownInMillis) {
591       return new TimedTakeQueue(countdownInMillis);
592     }
593 
TimedTakeQueue(long countdownInMillis)594     private TimedTakeQueue(long countdownInMillis) {
595       this.queue = new ArrayBlockingQueue<>(1);
596       this.completed = new Completion(countdownInMillis);
597       scheduleEnableReads(this.queue, countdownInMillis);
598     }
599 
600     /** Perform a {@code take} and assert that operation completed in the expected timeframe. */
takeSuccessfully()601     void takeSuccessfully() {
602       assertEquals(EXPECTED_TAKE, takeUninterruptibly(queue));
603       completed.assertCompletionExpected();
604       assertTrue(queue.isEmpty());
605     }
606 
scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis)607     private static void scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis) {
608       Runnable toRun = new EnableReads(queue, countdownInMillis);
609       // TODO(cpovirk): automatically fail the test if this thread throws
610       Thread enablerThread = new Thread(toRun);
611       enablerThread.start();
612     }
613   }
614 
615   /** Manages a {@link Semaphore} and associated timings. */
616   private static final class TimedSemaphore {
617     final Semaphore semaphore;
618     final Completion completed;
619 
620     /**
621      * Create a {@link Release} which will release a semaphore permit in {@code countdownInMillis}.
622      */
createWithDelay(long countdownInMillis)623     static TimedSemaphore createWithDelay(long countdownInMillis) {
624       return new TimedSemaphore(countdownInMillis);
625     }
626 
TimedSemaphore(long countdownInMillis)627     private TimedSemaphore(long countdownInMillis) {
628       this.semaphore = new Semaphore(0);
629       this.completed = new Completion(countdownInMillis);
630       scheduleRelease(countdownInMillis);
631     }
632 
633     /**
634      * Requests a permit from the semaphore with a timeout and asserts that operation completed in
635      * the expected timeframe.
636      */
tryAcquireSuccessfully(long timeoutMillis)637     void tryAcquireSuccessfully(long timeoutMillis) {
638       assertTrue(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS));
639       completed.assertCompletionExpected();
640     }
641 
tryAcquireSuccessfully(int permits, long timeoutMillis)642     void tryAcquireSuccessfully(int permits, long timeoutMillis) {
643       assertTrue(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS));
644       completed.assertCompletionExpected();
645     }
646 
647     /**
648      * Requests a permit from the semaphore with a timeout and asserts that the wait returned within
649      * the expected timeout.
650      */
tryAcquireUnsuccessfully(long timeoutMillis)651     private void tryAcquireUnsuccessfully(long timeoutMillis) {
652       assertFalse(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS));
653       completed.assertCompletionNotExpected(timeoutMillis);
654     }
655 
tryAcquireUnsuccessfully(int permits, long timeoutMillis)656     private void tryAcquireUnsuccessfully(int permits, long timeoutMillis) {
657       assertFalse(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS));
658       completed.assertCompletionNotExpected(timeoutMillis);
659     }
660 
scheduleRelease(long countdownInMillis)661     private void scheduleRelease(long countdownInMillis) {
662       DelayedActionRunnable toRun = new Release(semaphore, countdownInMillis);
663       // TODO(cpovirk): automatically fail the test if this thread throws
664       Thread releaserThread = new Thread(toRun);
665       releaserThread.start();
666     }
667   }
668 
669   private abstract static class DelayedActionRunnable implements Runnable {
670     private final long tMinus;
671 
DelayedActionRunnable(long tMinus)672     protected DelayedActionRunnable(long tMinus) {
673       this.tMinus = tMinus;
674     }
675 
676     @Override
run()677     public final void run() {
678       try {
679         Thread.sleep(tMinus);
680       } catch (InterruptedException e) {
681         throw new AssertionError(e);
682       }
683       doAction();
684     }
685 
doAction()686     protected abstract void doAction();
687   }
688 
689   private static class CountDown extends DelayedActionRunnable {
690     private final CountDownLatch latch;
691 
CountDown(CountDownLatch latch, long tMinus)692     public CountDown(CountDownLatch latch, long tMinus) {
693       super(tMinus);
694       this.latch = latch;
695     }
696 
697     @Override
doAction()698     protected void doAction() {
699       latch.countDown();
700     }
701   }
702 
703   private static class EnableWrites extends DelayedActionRunnable {
704     private final BlockingQueue<String> queue;
705 
EnableWrites(BlockingQueue<String> queue, long tMinus)706     public EnableWrites(BlockingQueue<String> queue, long tMinus) {
707       super(tMinus);
708       assertFalse(queue.isEmpty());
709       assertFalse(queue.offer("shouldBeRejected"));
710       this.queue = queue;
711     }
712 
713     @Override
doAction()714     protected void doAction() {
715       assertNotNull(queue.remove());
716     }
717   }
718 
719   private static class EnableReads extends DelayedActionRunnable {
720     private final BlockingQueue<String> queue;
721 
EnableReads(BlockingQueue<String> queue, long tMinus)722     public EnableReads(BlockingQueue<String> queue, long tMinus) {
723       super(tMinus);
724       assertTrue(queue.isEmpty());
725       this.queue = queue;
726     }
727 
728     @Override
doAction()729     protected void doAction() {
730       assertTrue(queue.offer(EXPECTED_TAKE));
731     }
732   }
733 
734   private static final class TimedThread {
735     private final Thread thread;
736     private final Completion completed;
737 
createWithDelay(long countdownInMillis)738     static TimedThread createWithDelay(long countdownInMillis) {
739       return new TimedThread(countdownInMillis);
740     }
741 
TimedThread(long expectedCompletionWaitMillis)742     private TimedThread(long expectedCompletionWaitMillis) {
743       completed = new Completion(expectedCompletionWaitMillis);
744       thread = new Thread(new JoinTarget(expectedCompletionWaitMillis));
745       thread.start();
746     }
747 
joinSuccessfully()748     void joinSuccessfully() {
749       Uninterruptibles.joinUninterruptibly(thread);
750       completed.assertCompletionExpected();
751       assertEquals(Thread.State.TERMINATED, thread.getState());
752     }
753 
joinSuccessfully(long timeoutMillis)754     void joinSuccessfully(long timeoutMillis) {
755       Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS);
756       completed.assertCompletionExpected();
757       assertEquals(Thread.State.TERMINATED, thread.getState());
758     }
759 
joinUnsuccessfully(long timeoutMillis)760     void joinUnsuccessfully(long timeoutMillis) {
761       Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS);
762       completed.assertCompletionNotExpected(timeoutMillis);
763       assertFalse(Thread.State.TERMINATED.equals(thread.getState()));
764     }
765   }
766 
767   private static class JoinTarget extends DelayedActionRunnable {
JoinTarget(long tMinus)768     public JoinTarget(long tMinus) {
769       super(tMinus);
770     }
771 
772     @Override
doAction()773     protected void doAction() {}
774   }
775 
776   private static class Release extends DelayedActionRunnable {
777     private final Semaphore semaphore;
778 
Release(Semaphore semaphore, long tMinus)779     public Release(Semaphore semaphore, long tMinus) {
780       super(tMinus);
781       this.semaphore = semaphore;
782     }
783 
784     @Override
doAction()785     protected void doAction() {
786       semaphore.release(10);
787     }
788   }
789 
790   private static final class SleepTask extends DelayedActionRunnable {
SleepTask(long tMinus)791     SleepTask(long tMinus) {
792       super(tMinus);
793     }
794 
795     @Override
doAction()796     protected void doAction() {}
797   }
798 
sleepSuccessfully(long sleepMillis)799   private static void sleepSuccessfully(long sleepMillis) {
800     Completion completed = new Completion(sleepMillis - SLEEP_SLACK);
801     Uninterruptibles.sleepUninterruptibly(sleepMillis, MILLISECONDS);
802     completed.assertCompletionExpected();
803   }
804 
assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis)805   private static void assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis) {
806     long elapsedMillis = stopwatch.elapsed(MILLISECONDS);
807     assertTrue(elapsedMillis < timelimitMillis);
808   }
809 
810   /**
811    * Await an interrupt, then clear the interrupt status. Similar to {@code
812    * assertTrue(Thread.interrupted())} except that this version tolerates late interrupts.
813    */
814   private static void assertInterrupted() {
815     try {
816       /*
817        * The sleep() will end immediately if we've already been interrupted or
818        * wait patiently for the interrupt if not.
819        */
820       Thread.sleep(LONG_DELAY_MS);
821       fail("Dude, where's my interrupt?");
822     } catch (InterruptedException expected) {
823     }
824   }
825 
826   private static void assertNotInterrupted() {
827     assertFalse(Thread.interrupted());
828   }
829 
830   private static void requestInterruptIn(long millis) {
831     InterruptionUtil.requestInterruptIn(millis, MILLISECONDS);
832   }
833 
834   @CanIgnoreReturnValue
835   private static Thread acquireFor(final Lock lock, final long duration, final TimeUnit unit) {
836     final CountDownLatch latch = new CountDownLatch(1);
837     Thread thread =
838         new Thread() {
839           @Override
840           public void run() {
841             lock.lock();
842             latch.countDown();
843             try {
844               Thread.sleep(unit.toMillis(duration));
845             } catch (InterruptedException e) {
846               // simply finish execution
847             } finally {
848               lock.unlock();
849             }
850           }
851         };
852     thread.setDaemon(true);
853     thread.start();
854     awaitUninterruptibly(latch);
855     return thread;
856   }
857 
858   private static class TestCondition implements Condition {
859     private final Lock lock;
860     private final Condition condition;
861 
862     private TestCondition(Lock lock, Condition condition) {
863       this.lock = lock;
864       this.condition = condition;
865     }
866 
867     static TestCondition createAndSignalAfter(long delay, TimeUnit unit) {
868       final TestCondition testCondition = create();
869 
870       ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1);
871       // If signal() fails somehow, we should see a failed test, even without looking at the Future.
872       Future<?> unused =
873           scheduledPool.schedule(
874               new Runnable() {
875                 @Override
876                 public void run() {
877                   testCondition.signal();
878                 }
879               },
880               delay,
881               unit);
882 
883       return testCondition;
884     }
885 
886     static TestCondition create() {
887       Lock lock = new ReentrantLock();
888       Condition condition = lock.newCondition();
889       return new TestCondition(lock, condition);
890     }
891 
892     @Override
893     public void await() throws InterruptedException {
894       lock.lock();
895       try {
896         condition.await();
897       } finally {
898         lock.unlock();
899       }
900     }
901 
902     @Override
903     public boolean await(long time, TimeUnit unit) throws InterruptedException {
904       lock.lock();
905       try {
906         return condition.await(time, unit);
907       } finally {
908         lock.unlock();
909       }
910     }
911 
912     @Override
913     public void awaitUninterruptibly() {
914       lock.lock();
915       try {
916         condition.awaitUninterruptibly();
917       } finally {
918         lock.unlock();
919       }
920     }
921 
922     @Override
923     public long awaitNanos(long nanosTimeout) throws InterruptedException {
924       lock.lock();
925       try {
926         return condition.awaitNanos(nanosTimeout);
927       } finally {
928         lock.unlock();
929       }
930     }
931 
932     @Override
933     public boolean awaitUntil(Date deadline) throws InterruptedException {
934       lock.lock();
935       try {
936         return condition.awaitUntil(deadline);
937       } finally {
938         lock.unlock();
939       }
940     }
941 
942     @Override
943     public void signal() {
944       lock.lock();
945       try {
946         condition.signal();
947       } finally {
948         lock.unlock();
949       }
950     }
951 
952     @Override
953     public void signalAll() {
954       lock.lock();
955       try {
956         condition.signalAll();
957       } finally {
958         lock.unlock();
959       }
960     }
961   }
962 }
963