• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.collect;
18 
19 import static com.google.common.collect.Lists.newArrayList;
20 import static com.google.common.truth.Truth.assertThat;
21 import static java.lang.Long.MAX_VALUE;
22 import static java.lang.Thread.currentThread;
23 import static java.util.concurrent.Executors.newCachedThreadPool;
24 import static java.util.concurrent.TimeUnit.MILLISECONDS;
25 import static java.util.concurrent.TimeUnit.NANOSECONDS;
26 import static java.util.concurrent.TimeUnit.SECONDS;
27 import static org.junit.Assert.assertThrows;
28 
29 import com.google.common.base.Stopwatch;
30 import java.util.Collection;
31 import java.util.List;
32 import java.util.concurrent.ArrayBlockingQueue;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.LinkedBlockingDeque;
39 import java.util.concurrent.LinkedBlockingQueue;
40 import java.util.concurrent.PriorityBlockingQueue;
41 import java.util.concurrent.SynchronousQueue;
42 import java.util.concurrent.TimeUnit;
43 import junit.framework.TestCase;
44 import org.checkerframework.checker.nullness.qual.Nullable;
45 
46 /**
47  * Tests for {@link Queues}.
48  *
49  * @author Dimitris Andreou
50  */
51 public class QueuesTest extends TestCase {
52   /*
53    * All the following tests relate to BlockingQueue methods in Queues.
54    */
55 
blockingQueues()56   public static List<BlockingQueue<Object>> blockingQueues() {
57     return ImmutableList.<BlockingQueue<Object>>of(
58         new LinkedBlockingQueue<Object>(),
59         new LinkedBlockingQueue<Object>(10),
60         new SynchronousQueue<Object>(),
61         new ArrayBlockingQueue<Object>(10),
62         new LinkedBlockingDeque<Object>(),
63         new LinkedBlockingDeque<Object>(10),
64         new PriorityBlockingQueue<Object>(10, Ordering.arbitrary()));
65   }
66 
67   /*
68    * We need to perform operations in a thread pool, even for simple cases, because the queue might
69    * be a SynchronousQueue.
70    */
71   private ExecutorService threadPool;
72 
73   @Override
setUp()74   public void setUp() {
75     threadPool = newCachedThreadPool();
76   }
77 
78   @Override
tearDown()79   public void tearDown() throws InterruptedException {
80     threadPool.shutdown();
81     assertTrue("Some worker didn't finish in time", threadPool.awaitTermination(10, SECONDS));
82   }
83 
drain( BlockingQueue<T> q, Collection<? super T> buffer, int maxElements, long timeout, TimeUnit unit, boolean interruptibly)84   private static <T> int drain(
85       BlockingQueue<T> q,
86       Collection<? super T> buffer,
87       int maxElements,
88       long timeout,
89       TimeUnit unit,
90       boolean interruptibly)
91       throws InterruptedException {
92     return interruptibly
93         ? Queues.drain(q, buffer, maxElements, timeout, unit)
94         : Queues.drainUninterruptibly(q, buffer, maxElements, timeout, unit);
95   }
96 
testMultipleProducers()97   public void testMultipleProducers() throws Exception {
98     for (BlockingQueue<Object> q : blockingQueues()) {
99       testMultipleProducers(q);
100     }
101   }
102 
testMultipleProducers(BlockingQueue<Object> q)103   private void testMultipleProducers(BlockingQueue<Object> q) throws InterruptedException {
104     for (boolean interruptibly : new boolean[] {true, false}) {
105       @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
106       Future<?> possiblyIgnoredError = threadPool.submit(new Producer(q, 20));
107       @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
108       Future<?> possiblyIgnoredError1 = threadPool.submit(new Producer(q, 20));
109       @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
110       Future<?> possiblyIgnoredError2 = threadPool.submit(new Producer(q, 20));
111       @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
112       Future<?> possiblyIgnoredError3 = threadPool.submit(new Producer(q, 20));
113       @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
114       Future<?> possiblyIgnoredError4 = threadPool.submit(new Producer(q, 20));
115 
116       List<Object> buf = newArrayList();
117       int elements = drain(q, buf, 100, MAX_VALUE, NANOSECONDS, interruptibly);
118       assertEquals(100, elements);
119       assertEquals(100, buf.size());
120       assertDrained(q);
121     }
122   }
123 
testDrainTimesOut()124   public void testDrainTimesOut() throws Exception {
125     for (BlockingQueue<Object> q : blockingQueues()) {
126       testDrainTimesOut(q);
127     }
128   }
129 
testDrainTimesOut(BlockingQueue<Object> q)130   private void testDrainTimesOut(BlockingQueue<Object> q) throws Exception {
131     for (boolean interruptibly : new boolean[] {true, false}) {
132       assertEquals(0, Queues.drain(q, ImmutableList.of(), 1, 10, MILLISECONDS));
133 
134       Producer producer = new Producer(q, 1);
135       // producing one, will ask for two
136       Future<?> producerThread = threadPool.submit(producer);
137       producer.beganProducing.await();
138 
139       // make sure we time out
140       Stopwatch timer = Stopwatch.createStarted();
141 
142       int drained = drain(q, newArrayList(), 2, 10, MILLISECONDS, interruptibly);
143       assertThat(drained).isAtMost(1);
144 
145       assertThat(timer.elapsed(MILLISECONDS)).isAtLeast(10L);
146 
147       // If even the first one wasn't there, clean up so that the next test doesn't see an element.
148       producerThread.cancel(true);
149       producer.doneProducing.await();
150       if (drained == 0) {
151         q.poll(); // not necessarily there if producer was interrupted
152       }
153     }
154   }
155 
testZeroElements()156   public void testZeroElements() throws Exception {
157     for (BlockingQueue<Object> q : blockingQueues()) {
158       testZeroElements(q);
159     }
160   }
161 
testZeroElements(BlockingQueue<Object> q)162   private void testZeroElements(BlockingQueue<Object> q) throws InterruptedException {
163     for (boolean interruptibly : new boolean[] {true, false}) {
164       // asking to drain zero elements
165       assertEquals(0, drain(q, ImmutableList.of(), 0, 10, MILLISECONDS, interruptibly));
166     }
167   }
168 
testEmpty()169   public void testEmpty() throws Exception {
170     for (BlockingQueue<Object> q : blockingQueues()) {
171       testEmpty(q);
172     }
173   }
174 
testEmpty(BlockingQueue<Object> q)175   private void testEmpty(BlockingQueue<Object> q) {
176     assertDrained(q);
177   }
178 
testNegativeMaxElements()179   public void testNegativeMaxElements() throws Exception {
180     for (BlockingQueue<Object> q : blockingQueues()) {
181       testNegativeMaxElements(q);
182     }
183   }
184 
testNegativeMaxElements(BlockingQueue<Object> q)185   private void testNegativeMaxElements(BlockingQueue<Object> q) throws InterruptedException {
186     @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
187     Future<?> possiblyIgnoredError = threadPool.submit(new Producer(q, 1));
188 
189     List<Object> buf = newArrayList();
190     int elements = Queues.drain(q, buf, -1, MAX_VALUE, NANOSECONDS);
191     assertEquals(0, elements);
192     assertThat(buf).isEmpty();
193 
194     // Free the producer thread, and give subsequent tests a clean slate.
195     q.take();
196   }
197 
testDrain_throws()198   public void testDrain_throws() throws Exception {
199     for (BlockingQueue<Object> q : blockingQueues()) {
200       testDrain_throws(q);
201     }
202   }
203 
testDrain_throws(BlockingQueue<Object> q)204   private void testDrain_throws(BlockingQueue<Object> q) {
205     @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
206     Future<?> possiblyIgnoredError = threadPool.submit(new Interrupter(currentThread()));
207     try {
208       Queues.drain(q, ImmutableList.of(), 100, MAX_VALUE, NANOSECONDS);
209       fail();
210     } catch (InterruptedException expected) {
211     }
212   }
213 
testDrainUninterruptibly_doesNotThrow()214   public void testDrainUninterruptibly_doesNotThrow() throws Exception {
215     for (BlockingQueue<Object> q : blockingQueues()) {
216       testDrainUninterruptibly_doesNotThrow(q);
217     }
218   }
219 
testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q)220   private void testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q) {
221     final Thread mainThread = currentThread();
222     @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
223     Future<?> possiblyIgnoredError =
224         threadPool.submit(
225             new Callable<@Nullable Void>() {
226               @Override
227               public @Nullable Void call() throws InterruptedException {
228                 new Producer(q, 50).call();
229                 new Interrupter(mainThread).run();
230                 new Producer(q, 50).call();
231                 return null;
232               }
233             });
234     List<Object> buf = newArrayList();
235     int elements = Queues.drainUninterruptibly(q, buf, 100, MAX_VALUE, NANOSECONDS);
236     // so when this drains all elements, we know the thread has also been interrupted in between
237     assertTrue(Thread.interrupted());
238     assertEquals(100, elements);
239     assertEquals(100, buf.size());
240   }
241 
testNewLinkedBlockingDequeCapacity()242   public void testNewLinkedBlockingDequeCapacity() {
243     assertThrows(IllegalArgumentException.class, () -> Queues.newLinkedBlockingDeque(0));
244     assertEquals(1, Queues.newLinkedBlockingDeque(1).remainingCapacity());
245     assertEquals(11, Queues.newLinkedBlockingDeque(11).remainingCapacity());
246   }
247 
testNewLinkedBlockingQueueCapacity()248   public void testNewLinkedBlockingQueueCapacity() {
249     assertThrows(IllegalArgumentException.class, () -> Queues.newLinkedBlockingQueue(0));
250     assertEquals(1, Queues.newLinkedBlockingQueue(1).remainingCapacity());
251     assertEquals(11, Queues.newLinkedBlockingQueue(11).remainingCapacity());
252   }
253 
254   /** Checks that #drain() invocations behave correctly for a drained (empty) queue. */
assertDrained(BlockingQueue<Object> q)255   private void assertDrained(BlockingQueue<Object> q) {
256     assertNull(q.peek());
257     assertInterruptibleDrained(q);
258     assertUninterruptibleDrained(q);
259   }
260 
assertInterruptibleDrained(BlockingQueue<Object> q)261   private void assertInterruptibleDrained(BlockingQueue<Object> q) {
262     // nothing to drain, thus this should wait doing nothing
263     try {
264       assertEquals(0, Queues.drain(q, ImmutableList.of(), 0, 10, MILLISECONDS));
265     } catch (InterruptedException e) {
266       throw new AssertionError();
267     }
268 
269     // but does the wait actually occurs?
270     @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
271     Future<?> possiblyIgnoredError = threadPool.submit(new Interrupter(currentThread()));
272     try {
273       // if waiting works, this should get stuck
274       Queues.drain(q, newArrayList(), 1, MAX_VALUE, NANOSECONDS);
275       fail();
276     } catch (InterruptedException expected) {
277       // we indeed waited; a slow thread had enough time to interrupt us
278     }
279   }
280 
281   // same as above; uninterruptible version
assertUninterruptibleDrained(BlockingQueue<Object> q)282   private void assertUninterruptibleDrained(BlockingQueue<Object> q) {
283     assertEquals(0, Queues.drainUninterruptibly(q, ImmutableList.of(), 0, 10, MILLISECONDS));
284 
285     // but does the wait actually occurs?
286     @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
287     Future<?> possiblyIgnoredError = threadPool.submit(new Interrupter(currentThread()));
288 
289     Stopwatch timer = Stopwatch.createStarted();
290     Queues.drainUninterruptibly(q, newArrayList(), 1, 10, MILLISECONDS);
291     assertThat(timer.elapsed(MILLISECONDS)).isAtLeast(10L);
292     // wait for interrupted status and clear it
293     while (!Thread.interrupted()) {
294       Thread.yield();
295     }
296   }
297 
298   private static class Producer implements Callable<@Nullable Void> {
299     final BlockingQueue<Object> q;
300     final int elements;
301     final CountDownLatch beganProducing = new CountDownLatch(1);
302     final CountDownLatch doneProducing = new CountDownLatch(1);
303 
Producer(BlockingQueue<Object> q, int elements)304     Producer(BlockingQueue<Object> q, int elements) {
305       this.q = q;
306       this.elements = elements;
307     }
308 
309     @Override
call()310     public @Nullable Void call() throws InterruptedException {
311       try {
312         beganProducing.countDown();
313         for (int i = 0; i < elements; i++) {
314           q.put(new Object());
315         }
316         return null;
317       } finally {
318         doneProducing.countDown();
319       }
320     }
321   }
322 
323   private static class Interrupter implements Runnable {
324     final Thread threadToInterrupt;
325 
Interrupter(Thread threadToInterrupt)326     Interrupter(Thread threadToInterrupt) {
327       this.threadToInterrupt = threadToInterrupt;
328     }
329 
330     @Override
run()331     public void run() {
332       try {
333         Thread.sleep(100);
334       } catch (InterruptedException e) {
335         throw new AssertionError();
336       } finally {
337         threadToInterrupt.interrupt();
338       }
339     }
340   }
341 }
342