• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5  * use this file except in compliance with the License. You may obtain a copy of
6  * the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations under
14  * the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.util.concurrent.InterruptionUtil.repeatedlyInterruptTestThread;
20 import static com.google.common.util.concurrent.Uninterruptibles.awaitTerminationUninterruptibly;
21 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
22 import static com.google.common.util.concurrent.Uninterruptibles.joinUninterruptibly;
23 import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
24 import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
25 import static com.google.common.util.concurrent.Uninterruptibles.tryAcquireUninterruptibly;
26 import static com.google.common.util.concurrent.Uninterruptibles.tryLockUninterruptibly;
27 import static java.util.concurrent.Executors.newFixedThreadPool;
28 import static java.util.concurrent.TimeUnit.MILLISECONDS;
29 import static java.util.concurrent.TimeUnit.SECONDS;
30 
31 import com.google.common.base.Preconditions;
32 import com.google.common.base.Stopwatch;
33 import com.google.common.testing.NullPointerTester;
34 import com.google.common.testing.TearDown;
35 import com.google.common.testing.TearDownStack;
36 import com.google.errorprone.annotations.CanIgnoreReturnValue;
37 import java.time.Duration;
38 import java.util.Date;
39 import java.util.concurrent.ArrayBlockingQueue;
40 import java.util.concurrent.BlockingQueue;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.Future;
45 import java.util.concurrent.ScheduledExecutorService;
46 import java.util.concurrent.Semaphore;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.locks.Condition;
49 import java.util.concurrent.locks.Lock;
50 import java.util.concurrent.locks.ReentrantLock;
51 import junit.framework.TestCase;
52 
53 /**
54  * Tests for {@link Uninterruptibles}.
55  *
56  * @author Anthony Zana
57  */
58 public class UninterruptiblesTest extends TestCase {
59   private static final String EXPECTED_TAKE = "expectedTake";
60 
61   /** Timeout to use when we don't expect the timeout to expire. */
62   private static final long LONG_DELAY_MS = 2500;
63 
64   private static final long SLEEP_SLACK = 2;
65 
66   private final TearDownStack tearDownStack = new TearDownStack();
67 
68   // NOTE: All durations in these tests are expressed in milliseconds
69   @Override
setUp()70   protected void setUp() {
71     // Clear any previous interrupt before running the test.
72     if (Thread.currentThread().isInterrupted()) {
73       throw new AssertionError(
74           "Thread interrupted on test entry. "
75               + "Some test probably didn't clear the interrupt state");
76     }
77 
78     tearDownStack.addTearDown(
79         new TearDown() {
80           @Override
81           public void tearDown() {
82             Thread.interrupted();
83           }
84         });
85   }
86 
87   @Override
tearDown()88   protected void tearDown() {
89     tearDownStack.runTearDown();
90   }
91 
testNull()92   public void testNull() throws Exception {
93     new NullPointerTester()
94         .setDefault(CountDownLatch.class, new CountDownLatch(0))
95         .setDefault(Semaphore.class, new Semaphore(999))
96         .testAllPublicStaticMethods(Uninterruptibles.class);
97   }
98 
99   // IncrementableCountDownLatch.await() tests
100 
101   // CountDownLatch.await() tests
102 
103   // Condition.await() tests
testConditionAwaitTimeoutExceeded()104   public void testConditionAwaitTimeoutExceeded() {
105     Stopwatch stopwatch = Stopwatch.createStarted();
106     Condition condition = TestCondition.create();
107 
108     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 500, MILLISECONDS);
109 
110     assertFalse(signaledBeforeTimeout);
111     assertAtLeastTimePassed(stopwatch, 500);
112     assertNotInterrupted();
113   }
114 
testConditionAwaitTimeoutNotExceeded()115   public void testConditionAwaitTimeoutNotExceeded() {
116     Stopwatch stopwatch = Stopwatch.createStarted();
117     Condition condition = TestCondition.createAndSignalAfter(500, MILLISECONDS);
118 
119     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS);
120 
121     assertTrue(signaledBeforeTimeout);
122     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
123     assertNotInterrupted();
124   }
125 
testConditionAwaitInterruptedTimeoutExceeded()126   public void testConditionAwaitInterruptedTimeoutExceeded() {
127     Stopwatch stopwatch = Stopwatch.createStarted();
128     Condition condition = TestCondition.create();
129     requestInterruptIn(500);
130 
131     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1000, MILLISECONDS);
132 
133     assertFalse(signaledBeforeTimeout);
134     assertAtLeastTimePassed(stopwatch, 1000);
135     assertInterrupted();
136   }
137 
testConditionAwaitInterruptedTimeoutNotExceeded()138   public void testConditionAwaitInterruptedTimeoutNotExceeded() {
139     Stopwatch stopwatch = Stopwatch.createStarted();
140     Condition condition = TestCondition.createAndSignalAfter(1000, MILLISECONDS);
141     requestInterruptIn(500);
142 
143     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS);
144 
145     assertTrue(signaledBeforeTimeout);
146     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
147     assertInterrupted();
148   }
149 
150   // Lock.tryLock() tests
testTryLockTimeoutExceeded()151   public void testTryLockTimeoutExceeded() {
152     Stopwatch stopwatch = Stopwatch.createStarted();
153     Lock lock = new ReentrantLock();
154     Thread lockThread = acquireFor(lock, 5, SECONDS);
155 
156     boolean lockAcquired = tryLockUninterruptibly(lock, 500, MILLISECONDS);
157 
158     assertFalse(lockAcquired);
159     assertAtLeastTimePassed(stopwatch, 500);
160     assertNotInterrupted();
161 
162     // finish locking thread
163     lockThread.interrupt();
164   }
165 
testTryLockTimeoutNotExceeded()166   public void testTryLockTimeoutNotExceeded() {
167     Stopwatch stopwatch = Stopwatch.createStarted();
168     Lock lock = new ReentrantLock();
169     acquireFor(lock, 500, MILLISECONDS);
170 
171     boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS);
172 
173     assertTrue(signaledBeforeTimeout);
174     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
175     assertNotInterrupted();
176   }
177 
testTryLockInterruptedTimeoutExceeded()178   public void testTryLockInterruptedTimeoutExceeded() {
179     Stopwatch stopwatch = Stopwatch.createStarted();
180     Lock lock = new ReentrantLock();
181     Thread lockThread = acquireFor(lock, 5, SECONDS);
182     requestInterruptIn(500);
183 
184     boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1000, MILLISECONDS);
185 
186     assertFalse(signaledBeforeTimeout);
187     assertAtLeastTimePassed(stopwatch, 1000);
188     assertInterrupted();
189 
190     // finish locking thread
191     lockThread.interrupt();
192   }
193 
testTryLockInterruptedTimeoutNotExceeded()194   public void testTryLockInterruptedTimeoutNotExceeded() {
195     Stopwatch stopwatch = Stopwatch.createStarted();
196     Lock lock = new ReentrantLock();
197     acquireFor(lock, 1000, MILLISECONDS);
198     requestInterruptIn(500);
199 
200     boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS);
201 
202     assertTrue(signaledBeforeTimeout);
203     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
204     assertInterrupted();
205   }
206 
207   // BlockingQueue.put() tests
testPutWithNoWait()208   public void testPutWithNoWait() {
209     Stopwatch stopwatch = Stopwatch.createStarted();
210     BlockingQueue<String> queue = new ArrayBlockingQueue<>(999);
211     putUninterruptibly(queue, "");
212     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
213     assertEquals("", queue.peek());
214   }
215 
testPutNoInterrupt()216   public void testPutNoInterrupt() {
217     TimedPutQueue queue = TimedPutQueue.createWithDelay(20);
218     queue.putSuccessfully();
219     assertNotInterrupted();
220   }
221 
testPutSingleInterrupt()222   public void testPutSingleInterrupt() {
223     TimedPutQueue queue = TimedPutQueue.createWithDelay(50);
224     requestInterruptIn(10);
225     queue.putSuccessfully();
226     assertInterrupted();
227   }
228 
testPutMultiInterrupt()229   public void testPutMultiInterrupt() {
230     TimedPutQueue queue = TimedPutQueue.createWithDelay(100);
231     repeatedlyInterruptTestThread(20, tearDownStack);
232     queue.putSuccessfully();
233     assertInterrupted();
234   }
235 
236   // BlockingQueue.take() tests
testTakeWithNoWait()237   public void testTakeWithNoWait() {
238     Stopwatch stopwatch = Stopwatch.createStarted();
239     BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
240     assertTrue(queue.offer(""));
241     assertEquals("", takeUninterruptibly(queue));
242     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
243   }
244 
testTakeNoInterrupt()245   public void testTakeNoInterrupt() {
246     TimedTakeQueue queue = TimedTakeQueue.createWithDelay(20);
247     queue.takeSuccessfully();
248     assertNotInterrupted();
249   }
250 
testTakeSingleInterrupt()251   public void testTakeSingleInterrupt() {
252     TimedTakeQueue queue = TimedTakeQueue.createWithDelay(50);
253     requestInterruptIn(10);
254     queue.takeSuccessfully();
255     assertInterrupted();
256   }
257 
testTakeMultiInterrupt()258   public void testTakeMultiInterrupt() {
259     TimedTakeQueue queue = TimedTakeQueue.createWithDelay(100);
260     repeatedlyInterruptTestThread(20, tearDownStack);
261     queue.takeSuccessfully();
262     assertInterrupted();
263   }
264 
265   // join() tests
testJoinWithNoWait()266   public void testJoinWithNoWait() throws InterruptedException {
267     Stopwatch stopwatch = Stopwatch.createStarted();
268     Thread thread = new Thread(new JoinTarget(15));
269     thread.start();
270     thread.join();
271     assertFalse(thread.isAlive());
272 
273     joinUninterruptibly(thread);
274     joinUninterruptibly(thread, 0, MILLISECONDS);
275     joinUninterruptibly(thread, -42, MILLISECONDS);
276     joinUninterruptibly(thread, LONG_DELAY_MS, MILLISECONDS);
277     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
278   }
279 
testJoinNoInterrupt()280   public void testJoinNoInterrupt() {
281     TimedThread thread = TimedThread.createWithDelay(20);
282     thread.joinSuccessfully();
283     assertNotInterrupted();
284   }
285 
testJoinTimeoutNoInterruptNotExpired()286   public void testJoinTimeoutNoInterruptNotExpired() {
287     TimedThread thread = TimedThread.createWithDelay(20);
288     thread.joinSuccessfully(LONG_DELAY_MS);
289     assertNotInterrupted();
290   }
291 
testJoinTimeoutNoInterruptExpired()292   public void testJoinTimeoutNoInterruptExpired() {
293     TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
294     thread.joinUnsuccessfully(30);
295     assertNotInterrupted();
296   }
297 
testJoinSingleInterrupt()298   public void testJoinSingleInterrupt() {
299     TimedThread thread = TimedThread.createWithDelay(50);
300     requestInterruptIn(10);
301     thread.joinSuccessfully();
302     assertInterrupted();
303   }
304 
testJoinTimeoutSingleInterruptNoExpire()305   public void testJoinTimeoutSingleInterruptNoExpire() {
306     TimedThread thread = TimedThread.createWithDelay(50);
307     requestInterruptIn(10);
308     thread.joinSuccessfully(LONG_DELAY_MS);
309     assertInterrupted();
310   }
311 
testJoinTimeoutSingleInterruptExpired()312   public void testJoinTimeoutSingleInterruptExpired() {
313     TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
314     requestInterruptIn(10);
315     thread.joinUnsuccessfully(50);
316     assertInterrupted();
317   }
318 
testJoinMultiInterrupt()319   public void testJoinMultiInterrupt() {
320     TimedThread thread = TimedThread.createWithDelay(100);
321     repeatedlyInterruptTestThread(20, tearDownStack);
322     thread.joinSuccessfully();
323     assertInterrupted();
324   }
325 
testJoinTimeoutMultiInterruptNoExpire()326   public void testJoinTimeoutMultiInterruptNoExpire() {
327     TimedThread thread = TimedThread.createWithDelay(100);
328     repeatedlyInterruptTestThread(20, tearDownStack);
329     thread.joinSuccessfully(LONG_DELAY_MS);
330     assertInterrupted();
331   }
332 
testJoinTimeoutMultiInterruptExpired()333   public void testJoinTimeoutMultiInterruptExpired() {
334     /*
335      * We don't "need" to schedule a thread completion at all here, but by doing
336      * so, we come the closest we can to testing that the wait time is
337      * appropriately decreased on each progressive join() call.
338      */
339     TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
340     repeatedlyInterruptTestThread(20, tearDownStack);
341     thread.joinUnsuccessfully(70);
342     assertInterrupted();
343   }
344 
345   // sleep() Tests
testSleepNoInterrupt()346   public void testSleepNoInterrupt() {
347     sleepSuccessfully(10);
348   }
349 
testSleepSingleInterrupt()350   public void testSleepSingleInterrupt() {
351     requestInterruptIn(10);
352     sleepSuccessfully(50);
353     assertInterrupted();
354   }
355 
testSleepMultiInterrupt()356   public void testSleepMultiInterrupt() {
357     repeatedlyInterruptTestThread(10, tearDownStack);
358     sleepSuccessfully(100);
359     assertInterrupted();
360   }
361 
362   // Semaphore.tryAcquire() tests
testTryAcquireWithNoWait()363   public void testTryAcquireWithNoWait() {
364     Stopwatch stopwatch = Stopwatch.createStarted();
365     Semaphore semaphore = new Semaphore(99);
366     assertTrue(tryAcquireUninterruptibly(semaphore, 0, MILLISECONDS));
367     assertTrue(tryAcquireUninterruptibly(semaphore, -42, MILLISECONDS));
368     assertTrue(tryAcquireUninterruptibly(semaphore, LONG_DELAY_MS, MILLISECONDS));
369     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
370   }
371 
testTryAcquireTimeoutNoInterruptNotExpired()372   public void testTryAcquireTimeoutNoInterruptNotExpired() {
373     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20);
374     semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
375     assertNotInterrupted();
376   }
377 
testTryAcquireTimeoutNoInterruptExpired()378   public void testTryAcquireTimeoutNoInterruptExpired() {
379     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
380     semaphore.tryAcquireUnsuccessfully(30);
381     assertNotInterrupted();
382   }
383 
testTryAcquireTimeoutSingleInterruptNoExpire()384   public void testTryAcquireTimeoutSingleInterruptNoExpire() {
385     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50);
386     requestInterruptIn(10);
387     semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
388     assertInterrupted();
389   }
390 
testTryAcquireTimeoutSingleInterruptExpired()391   public void testTryAcquireTimeoutSingleInterruptExpired() {
392     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
393     requestInterruptIn(10);
394     semaphore.tryAcquireUnsuccessfully(50);
395     assertInterrupted();
396   }
397 
testTryAcquireTimeoutMultiInterruptNoExpire()398   public void testTryAcquireTimeoutMultiInterruptNoExpire() {
399     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100);
400     repeatedlyInterruptTestThread(20, tearDownStack);
401     semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
402     assertInterrupted();
403   }
404 
testTryAcquireTimeoutMultiInterruptExpired()405   public void testTryAcquireTimeoutMultiInterruptExpired() {
406     /*
407      * We don't "need" to schedule a release() call at all here, but by doing
408      * so, we come the closest we can to testing that the wait time is
409      * appropriately decreased on each progressive tryAcquire() call.
410      */
411     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
412     repeatedlyInterruptTestThread(20, tearDownStack);
413     semaphore.tryAcquireUnsuccessfully(70);
414     assertInterrupted();
415   }
416 
testTryAcquireWithNoWaitMultiPermit()417   public void testTryAcquireWithNoWaitMultiPermit() {
418     Stopwatch stopwatch = Stopwatch.createStarted();
419     Semaphore semaphore = new Semaphore(99);
420     assertTrue(tryAcquireUninterruptibly(semaphore, 10, 0, MILLISECONDS));
421     assertTrue(tryAcquireUninterruptibly(semaphore, 10, -42, MILLISECONDS));
422     assertTrue(tryAcquireUninterruptibly(semaphore, 10, LONG_DELAY_MS, MILLISECONDS));
423     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
424   }
425 
testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit()426   public void testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit() {
427     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20);
428     semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
429     assertNotInterrupted();
430   }
431 
testTryAcquireTimeoutNoInterruptExpiredMultiPermit()432   public void testTryAcquireTimeoutNoInterruptExpiredMultiPermit() {
433     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
434     semaphore.tryAcquireUnsuccessfully(10, 30);
435     assertNotInterrupted();
436   }
437 
testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit()438   public void testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit() {
439     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50);
440     requestInterruptIn(10);
441     semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
442     assertInterrupted();
443   }
444 
testTryAcquireTimeoutSingleInterruptExpiredMultiPermit()445   public void testTryAcquireTimeoutSingleInterruptExpiredMultiPermit() {
446     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
447     requestInterruptIn(10);
448     semaphore.tryAcquireUnsuccessfully(10, 50);
449     assertInterrupted();
450   }
451 
testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit()452   public void testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit() {
453     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100);
454     repeatedlyInterruptTestThread(20, tearDownStack);
455     semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
456     assertInterrupted();
457   }
458 
testTryAcquireTimeoutMultiInterruptExpiredMultiPermit()459   public void testTryAcquireTimeoutMultiInterruptExpiredMultiPermit() {
460     /*
461      * We don't "need" to schedule a release() call at all here, but by doing
462      * so, we come the closest we can to testing that the wait time is
463      * appropriately decreased on each progressive tryAcquire() call.
464      */
465     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
466     repeatedlyInterruptTestThread(20, tearDownStack);
467     semaphore.tryAcquireUnsuccessfully(10, 70);
468     assertInterrupted();
469   }
470 
471   // executor.awaitTermination Testcases
testTryAwaitTerminationUninterruptiblyDuration_success()472   public void testTryAwaitTerminationUninterruptiblyDuration_success() {
473     ExecutorService executor = newFixedThreadPool(1);
474     requestInterruptIn(500);
475     executor.execute(new SleepTask(1000));
476     executor.shutdown();
477     assertTrue(awaitTerminationUninterruptibly(executor, Duration.ofMillis(LONG_DELAY_MS)));
478     assertTrue(executor.isTerminated());
479     assertInterrupted();
480   }
481 
testTryAwaitTerminationUninterruptiblyDuration_failure()482   public void testTryAwaitTerminationUninterruptiblyDuration_failure() {
483     ExecutorService executor = newFixedThreadPool(1);
484     requestInterruptIn(500);
485     executor.execute(new SleepTask(10000));
486     executor.shutdown();
487     assertFalse(awaitTerminationUninterruptibly(executor, Duration.ofMillis(1000)));
488     assertFalse(executor.isTerminated());
489     assertInterrupted();
490   }
491 
testTryAwaitTerminationUninterruptiblyLongTimeUnit_success()492   public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_success() {
493     ExecutorService executor = newFixedThreadPool(1);
494     requestInterruptIn(500);
495     executor.execute(new SleepTask(1000));
496     executor.shutdown();
497     assertTrue(awaitTerminationUninterruptibly(executor, LONG_DELAY_MS, MILLISECONDS));
498     assertTrue(executor.isTerminated());
499     assertInterrupted();
500   }
501 
testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure()502   public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure() {
503     ExecutorService executor = newFixedThreadPool(1);
504     requestInterruptIn(500);
505     executor.execute(new SleepTask(10000));
506     executor.shutdown();
507     assertFalse(awaitTerminationUninterruptibly(executor, 1000, MILLISECONDS));
508     assertFalse(executor.isTerminated());
509     assertInterrupted();
510   }
511 
testTryAwaitTerminationInfiniteTimeout()512   public void testTryAwaitTerminationInfiniteTimeout() {
513     ExecutorService executor = newFixedThreadPool(1);
514     requestInterruptIn(500);
515     executor.execute(new SleepTask(1000));
516     executor.shutdown();
517     awaitTerminationUninterruptibly(executor);
518     assertTrue(executor.isTerminated());
519     assertInterrupted();
520   }
521 
522   /**
523    * Wrapper around {@link Stopwatch} which also contains an "expected completion time." Creating a
524    * {@code Completion} starts the underlying stopwatch.
525    */
526   private static final class Completion {
527     final Stopwatch stopwatch;
528     final long expectedCompletionWaitMillis;
529 
Completion(long expectedCompletionWaitMillis)530     Completion(long expectedCompletionWaitMillis) {
531       this.expectedCompletionWaitMillis = expectedCompletionWaitMillis;
532       stopwatch = Stopwatch.createStarted();
533     }
534 
535     /**
536      * Asserts that the expected completion time has passed (and not "too much" time beyond that).
537      */
assertCompletionExpected()538     void assertCompletionExpected() {
539       assertAtLeastTimePassed(stopwatch, expectedCompletionWaitMillis);
540       assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis + LONG_DELAY_MS);
541     }
542 
543     /**
544      * Asserts that at least {@code timeout} has passed but the expected completion time has not.
545      */
assertCompletionNotExpected(long timeout)546     void assertCompletionNotExpected(long timeout) {
547       Preconditions.checkArgument(timeout < expectedCompletionWaitMillis);
548       assertAtLeastTimePassed(stopwatch, timeout);
549       assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis);
550     }
551   }
552 
553   private static void assertAtLeastTimePassed(Stopwatch stopwatch, long expectedMillis) {
554     long elapsedMillis = stopwatch.elapsed(MILLISECONDS);
555     /*
556      * The "+ 5" below is to permit, say, sleep(10) to sleep only 9 milliseconds. We see such
557      * behavior sometimes when running these tests publicly as part of Guava. "+ 5" is probably more
558      * generous than it needs to be.
559      */
560     assertTrue(
561         "Expected elapsed millis to be >= " + expectedMillis + " but was " + elapsedMillis,
562         elapsedMillis + 5 >= expectedMillis);
563   }
564 
565   // TODO(cpovirk): Split this into separate CountDownLatch and IncrementableCountDownLatch classes.
566 
567   /** Manages a {@link BlockingQueue} and associated timings for a {@code put} call. */
568   private static final class TimedPutQueue {
569     final BlockingQueue<String> queue;
570     final Completion completed;
571 
572     /**
573      * Creates a {@link EnableWrites} which open up a spot for a {@code put} to succeed in {@code
574      * countdownInMillis}.
575      */
createWithDelay(long countdownInMillis)576     static TimedPutQueue createWithDelay(long countdownInMillis) {
577       return new TimedPutQueue(countdownInMillis);
578     }
579 
TimedPutQueue(long countdownInMillis)580     private TimedPutQueue(long countdownInMillis) {
581       this.queue = new ArrayBlockingQueue<>(1);
582       assertTrue(queue.offer("blocksPutCallsUntilRemoved"));
583       this.completed = new Completion(countdownInMillis);
584       scheduleEnableWrites(this.queue, countdownInMillis);
585     }
586 
587     /** Perform a {@code put} and assert that operation completed in the expected timeframe. */
putSuccessfully()588     void putSuccessfully() {
589       putUninterruptibly(queue, "");
590       completed.assertCompletionExpected();
591       assertEquals("", queue.peek());
592     }
593 
scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis)594     private static void scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis) {
595       Runnable toRun = new EnableWrites(queue, countdownInMillis);
596       // TODO(cpovirk): automatically fail the test if this thread throws
597       Thread enablerThread = new Thread(toRun);
598       enablerThread.start();
599     }
600   }
601 
602   /** Manages a {@link BlockingQueue} and associated timings for a {@code take} call. */
603   private static final class TimedTakeQueue {
604     final BlockingQueue<String> queue;
605     final Completion completed;
606 
607     /**
608      * Creates a {@link EnableReads} which insert an element for a {@code take} to receive in {@code
609      * countdownInMillis}.
610      */
createWithDelay(long countdownInMillis)611     static TimedTakeQueue createWithDelay(long countdownInMillis) {
612       return new TimedTakeQueue(countdownInMillis);
613     }
614 
TimedTakeQueue(long countdownInMillis)615     private TimedTakeQueue(long countdownInMillis) {
616       this.queue = new ArrayBlockingQueue<>(1);
617       this.completed = new Completion(countdownInMillis);
618       scheduleEnableReads(this.queue, countdownInMillis);
619     }
620 
621     /** Perform a {@code take} and assert that operation completed in the expected timeframe. */
takeSuccessfully()622     void takeSuccessfully() {
623       assertEquals(EXPECTED_TAKE, takeUninterruptibly(queue));
624       completed.assertCompletionExpected();
625       assertTrue(queue.isEmpty());
626     }
627 
scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis)628     private static void scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis) {
629       Runnable toRun = new EnableReads(queue, countdownInMillis);
630       // TODO(cpovirk): automatically fail the test if this thread throws
631       Thread enablerThread = new Thread(toRun);
632       enablerThread.start();
633     }
634   }
635 
636   /** Manages a {@link Semaphore} and associated timings. */
637   private static final class TimedSemaphore {
638     final Semaphore semaphore;
639     final Completion completed;
640 
641     /**
642      * Create a {@link Release} which will release a semaphore permit in {@code countdownInMillis}.
643      */
createWithDelay(long countdownInMillis)644     static TimedSemaphore createWithDelay(long countdownInMillis) {
645       return new TimedSemaphore(countdownInMillis);
646     }
647 
TimedSemaphore(long countdownInMillis)648     private TimedSemaphore(long countdownInMillis) {
649       this.semaphore = new Semaphore(0);
650       this.completed = new Completion(countdownInMillis);
651       scheduleRelease(countdownInMillis);
652     }
653 
654     /**
655      * Requests a permit from the semaphore with a timeout and asserts that operation completed in
656      * the expected timeframe.
657      */
tryAcquireSuccessfully(long timeoutMillis)658     void tryAcquireSuccessfully(long timeoutMillis) {
659       assertTrue(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS));
660       completed.assertCompletionExpected();
661     }
662 
tryAcquireSuccessfully(int permits, long timeoutMillis)663     void tryAcquireSuccessfully(int permits, long timeoutMillis) {
664       assertTrue(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS));
665       completed.assertCompletionExpected();
666     }
667 
668     /**
669      * Requests a permit from the semaphore with a timeout and asserts that the wait returned within
670      * the expected timeout.
671      */
tryAcquireUnsuccessfully(long timeoutMillis)672     private void tryAcquireUnsuccessfully(long timeoutMillis) {
673       assertFalse(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS));
674       completed.assertCompletionNotExpected(timeoutMillis);
675     }
676 
tryAcquireUnsuccessfully(int permits, long timeoutMillis)677     private void tryAcquireUnsuccessfully(int permits, long timeoutMillis) {
678       assertFalse(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS));
679       completed.assertCompletionNotExpected(timeoutMillis);
680     }
681 
scheduleRelease(long countdownInMillis)682     private void scheduleRelease(long countdownInMillis) {
683       DelayedActionRunnable toRun = new Release(semaphore, countdownInMillis);
684       // TODO(cpovirk): automatically fail the test if this thread throws
685       Thread releaserThread = new Thread(toRun);
686       releaserThread.start();
687     }
688   }
689 
690   private abstract static class DelayedActionRunnable implements Runnable {
691     private final long tMinus;
692 
DelayedActionRunnable(long tMinus)693     protected DelayedActionRunnable(long tMinus) {
694       this.tMinus = tMinus;
695     }
696 
697     @Override
run()698     public final void run() {
699       try {
700         Thread.sleep(tMinus);
701       } catch (InterruptedException e) {
702         throw new AssertionError(e);
703       }
704       doAction();
705     }
706 
doAction()707     protected abstract void doAction();
708   }
709 
710   private static class CountDown extends DelayedActionRunnable {
711     private final CountDownLatch latch;
712 
CountDown(CountDownLatch latch, long tMinus)713     public CountDown(CountDownLatch latch, long tMinus) {
714       super(tMinus);
715       this.latch = latch;
716     }
717 
718     @Override
doAction()719     protected void doAction() {
720       latch.countDown();
721     }
722   }
723 
724   private static class EnableWrites extends DelayedActionRunnable {
725     private final BlockingQueue<String> queue;
726 
EnableWrites(BlockingQueue<String> queue, long tMinus)727     public EnableWrites(BlockingQueue<String> queue, long tMinus) {
728       super(tMinus);
729       assertFalse(queue.isEmpty());
730       assertFalse(queue.offer("shouldBeRejected"));
731       this.queue = queue;
732     }
733 
734     @Override
doAction()735     protected void doAction() {
736       assertNotNull(queue.remove());
737     }
738   }
739 
740   private static class EnableReads extends DelayedActionRunnable {
741     private final BlockingQueue<String> queue;
742 
EnableReads(BlockingQueue<String> queue, long tMinus)743     public EnableReads(BlockingQueue<String> queue, long tMinus) {
744       super(tMinus);
745       assertTrue(queue.isEmpty());
746       this.queue = queue;
747     }
748 
749     @Override
doAction()750     protected void doAction() {
751       assertTrue(queue.offer(EXPECTED_TAKE));
752     }
753   }
754 
755   private static final class TimedThread {
756     private final Thread thread;
757     private final Completion completed;
758 
createWithDelay(long countdownInMillis)759     static TimedThread createWithDelay(long countdownInMillis) {
760       return new TimedThread(countdownInMillis);
761     }
762 
TimedThread(long expectedCompletionWaitMillis)763     private TimedThread(long expectedCompletionWaitMillis) {
764       completed = new Completion(expectedCompletionWaitMillis);
765       thread = new Thread(new JoinTarget(expectedCompletionWaitMillis));
766       thread.start();
767     }
768 
joinSuccessfully()769     void joinSuccessfully() {
770       Uninterruptibles.joinUninterruptibly(thread);
771       completed.assertCompletionExpected();
772       assertEquals(Thread.State.TERMINATED, thread.getState());
773     }
774 
joinSuccessfully(long timeoutMillis)775     void joinSuccessfully(long timeoutMillis) {
776       Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS);
777       completed.assertCompletionExpected();
778       assertEquals(Thread.State.TERMINATED, thread.getState());
779     }
780 
joinUnsuccessfully(long timeoutMillis)781     void joinUnsuccessfully(long timeoutMillis) {
782       Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS);
783       completed.assertCompletionNotExpected(timeoutMillis);
784       assertFalse(Thread.State.TERMINATED.equals(thread.getState()));
785     }
786   }
787 
788   private static class JoinTarget extends DelayedActionRunnable {
JoinTarget(long tMinus)789     public JoinTarget(long tMinus) {
790       super(tMinus);
791     }
792 
793     @Override
doAction()794     protected void doAction() {}
795   }
796 
797   private static class Release extends DelayedActionRunnable {
798     private final Semaphore semaphore;
799 
Release(Semaphore semaphore, long tMinus)800     public Release(Semaphore semaphore, long tMinus) {
801       super(tMinus);
802       this.semaphore = semaphore;
803     }
804 
805     @Override
doAction()806     protected void doAction() {
807       semaphore.release(10);
808     }
809   }
810 
811   private static final class SleepTask extends DelayedActionRunnable {
SleepTask(long tMinus)812     SleepTask(long tMinus) {
813       super(tMinus);
814     }
815 
816     @Override
doAction()817     protected void doAction() {}
818   }
819 
sleepSuccessfully(long sleepMillis)820   private static void sleepSuccessfully(long sleepMillis) {
821     Completion completed = new Completion(sleepMillis - SLEEP_SLACK);
822     Uninterruptibles.sleepUninterruptibly(sleepMillis, MILLISECONDS);
823     completed.assertCompletionExpected();
824   }
825 
assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis)826   private static void assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis) {
827     long elapsedMillis = stopwatch.elapsed(MILLISECONDS);
828     assertTrue(elapsedMillis < timelimitMillis);
829   }
830 
831   /**
832    * Await an interrupt, then clear the interrupt status. Similar to {@code
833    * assertTrue(Thread.interrupted())} except that this version tolerates late interrupts.
834    */
835   private static void assertInterrupted() {
836     try {
837       /*
838        * The sleep() will end immediately if we've already been interrupted or
839        * wait patiently for the interrupt if not.
840        */
841       Thread.sleep(LONG_DELAY_MS);
842       fail("Dude, where's my interrupt?");
843     } catch (InterruptedException expected) {
844     }
845   }
846 
847   private static void assertNotInterrupted() {
848     assertFalse(Thread.interrupted());
849   }
850 
851   private static void requestInterruptIn(long millis) {
852     InterruptionUtil.requestInterruptIn(millis, MILLISECONDS);
853   }
854 
855   @CanIgnoreReturnValue
856   private static Thread acquireFor(final Lock lock, final long duration, final TimeUnit unit) {
857     final CountDownLatch latch = new CountDownLatch(1);
858     Thread thread =
859         new Thread() {
860           @Override
861           public void run() {
862             lock.lock();
863             latch.countDown();
864             try {
865               Thread.sleep(unit.toMillis(duration));
866             } catch (InterruptedException e) {
867               // simply finish execution
868             } finally {
869               lock.unlock();
870             }
871           }
872         };
873     thread.setDaemon(true);
874     thread.start();
875     awaitUninterruptibly(latch);
876     return thread;
877   }
878 
879   private static class TestCondition implements Condition {
880     private final Lock lock;
881     private final Condition condition;
882 
883     private TestCondition(Lock lock, Condition condition) {
884       this.lock = lock;
885       this.condition = condition;
886     }
887 
888     static TestCondition createAndSignalAfter(long delay, TimeUnit unit) {
889       final TestCondition testCondition = create();
890 
891       ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1);
892       // If signal() fails somehow, we should see a failed test, even without looking at the Future.
893       Future<?> unused =
894           scheduledPool.schedule(
895               new Runnable() {
896                 @Override
897                 public void run() {
898                   testCondition.signal();
899                 }
900               },
901               delay,
902               unit);
903 
904       return testCondition;
905     }
906 
907     static TestCondition create() {
908       Lock lock = new ReentrantLock();
909       Condition condition = lock.newCondition();
910       return new TestCondition(lock, condition);
911     }
912 
913     @Override
914     public void await() throws InterruptedException {
915       lock.lock();
916       try {
917         condition.await();
918       } finally {
919         lock.unlock();
920       }
921     }
922 
923     @Override
924     public boolean await(long time, TimeUnit unit) throws InterruptedException {
925       lock.lock();
926       try {
927         return condition.await(time, unit);
928       } finally {
929         lock.unlock();
930       }
931     }
932 
933     @Override
934     public void awaitUninterruptibly() {
935       lock.lock();
936       try {
937         condition.awaitUninterruptibly();
938       } finally {
939         lock.unlock();
940       }
941     }
942 
943     @Override
944     public long awaitNanos(long nanosTimeout) throws InterruptedException {
945       lock.lock();
946       try {
947         return condition.awaitNanos(nanosTimeout);
948       } finally {
949         lock.unlock();
950       }
951     }
952 
953     @Override
954     public boolean awaitUntil(Date deadline) throws InterruptedException {
955       lock.lock();
956       try {
957         return condition.awaitUntil(deadline);
958       } finally {
959         lock.unlock();
960       }
961     }
962 
963     @Override
964     public void signal() {
965       lock.lock();
966       try {
967         condition.signal();
968       } finally {
969         lock.unlock();
970       }
971     }
972 
973     @Override
974     public void signalAll() {
975       lock.lock();
976       try {
977         condition.signalAll();
978       } finally {
979         lock.unlock();
980       }
981     }
982   }
983 }
984