• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.collect;
18 
19 import static com.google.common.collect.Lists.newArrayList;
20 import static com.google.common.truth.Truth.assertThat;
21 import static java.lang.Long.MAX_VALUE;
22 import static java.lang.Thread.currentThread;
23 import static java.util.concurrent.Executors.newCachedThreadPool;
24 import static java.util.concurrent.TimeUnit.MILLISECONDS;
25 import static java.util.concurrent.TimeUnit.NANOSECONDS;
26 import static java.util.concurrent.TimeUnit.SECONDS;
27 
28 import com.google.common.base.Stopwatch;
29 import java.util.Collection;
30 import java.util.List;
31 import java.util.concurrent.ArrayBlockingQueue;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.concurrent.Callable;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.LinkedBlockingDeque;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.PriorityBlockingQueue;
40 import java.util.concurrent.SynchronousQueue;
41 import java.util.concurrent.TimeUnit;
42 import junit.framework.TestCase;
43 
44 /**
45  * Tests for {@link Queues}.
46  *
47  * @author Dimitris Andreou
48  */
49 
50 public class QueuesTest extends TestCase {
51   /*
52    * All the following tests relate to BlockingQueue methods in Queues.
53    */
54 
blockingQueues()55   public static List<BlockingQueue<Object>> blockingQueues() {
56     return ImmutableList.<BlockingQueue<Object>>of(
57         new LinkedBlockingQueue<Object>(),
58         new LinkedBlockingQueue<Object>(10),
59         new SynchronousQueue<Object>(),
60         new ArrayBlockingQueue<Object>(10),
61         new LinkedBlockingDeque<Object>(),
62         new LinkedBlockingDeque<Object>(10),
63         new PriorityBlockingQueue<Object>(10, Ordering.arbitrary()));
64   }
65 
66   /*
67    * We need to perform operations in a thread pool, even for simple cases, because the queue might
68    * be a SynchronousQueue.
69    */
70   private ExecutorService threadPool;
71 
72   @Override
setUp()73   public void setUp() {
74     threadPool = newCachedThreadPool();
75   }
76 
77   @Override
tearDown()78   public void tearDown() throws InterruptedException {
79     threadPool.shutdown();
80     assertTrue("Some worker didn't finish in time", threadPool.awaitTermination(10, SECONDS));
81   }
82 
drain( BlockingQueue<T> q, Collection<? super T> buffer, int maxElements, long timeout, TimeUnit unit, boolean interruptibly)83   private static <T> int drain(
84       BlockingQueue<T> q,
85       Collection<? super T> buffer,
86       int maxElements,
87       long timeout,
88       TimeUnit unit,
89       boolean interruptibly)
90       throws InterruptedException {
91     return interruptibly
92         ? Queues.drain(q, buffer, maxElements, timeout, unit)
93         : Queues.drainUninterruptibly(q, buffer, maxElements, timeout, unit);
94   }
95 
testMultipleProducers()96   public void testMultipleProducers() throws Exception {
97     for (BlockingQueue<Object> q : blockingQueues()) {
98       testMultipleProducers(q);
99     }
100   }
101 
testMultipleProducers(BlockingQueue<Object> q)102   private void testMultipleProducers(BlockingQueue<Object> q) throws InterruptedException {
103     for (boolean interruptibly : new boolean[] {true, false}) {
104       @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
105       Future<?> possiblyIgnoredError = threadPool.submit(new Producer(q, 20));
106       @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
107       Future<?> possiblyIgnoredError1 = threadPool.submit(new Producer(q, 20));
108       @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
109       Future<?> possiblyIgnoredError2 = threadPool.submit(new Producer(q, 20));
110       @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
111       Future<?> possiblyIgnoredError3 = threadPool.submit(new Producer(q, 20));
112       @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
113       Future<?> possiblyIgnoredError4 = threadPool.submit(new Producer(q, 20));
114 
115       List<Object> buf = newArrayList();
116       int elements = drain(q, buf, 100, MAX_VALUE, NANOSECONDS, interruptibly);
117       assertEquals(100, elements);
118       assertEquals(100, buf.size());
119       assertDrained(q);
120     }
121   }
122 
testDrainTimesOut()123   public void testDrainTimesOut() throws Exception {
124     for (BlockingQueue<Object> q : blockingQueues()) {
125       testDrainTimesOut(q);
126     }
127   }
128 
testDrainTimesOut(BlockingQueue<Object> q)129   private void testDrainTimesOut(BlockingQueue<Object> q) throws Exception {
130     for (boolean interruptibly : new boolean[] {true, false}) {
131       assertEquals(0, Queues.drain(q, ImmutableList.of(), 1, 10, MILLISECONDS));
132 
133       Producer producer = new Producer(q, 1);
134       // producing one, will ask for two
135       Future<?> producerThread = threadPool.submit(producer);
136       producer.beganProducing.await();
137 
138       // make sure we time out
139       Stopwatch timer = Stopwatch.createStarted();
140 
141       int drained = drain(q, newArrayList(), 2, 10, MILLISECONDS, interruptibly);
142       assertThat(drained).isAtMost(1);
143 
144       assertThat(timer.elapsed(MILLISECONDS)).isAtLeast(10L);
145 
146       // If even the first one wasn't there, clean up so that the next test doesn't see an element.
147       producerThread.cancel(true);
148       producer.doneProducing.await();
149       if (drained == 0) {
150         q.poll(); // not necessarily there if producer was interrupted
151       }
152     }
153   }
154 
testZeroElements()155   public void testZeroElements() throws Exception {
156     for (BlockingQueue<Object> q : blockingQueues()) {
157       testZeroElements(q);
158     }
159   }
160 
testZeroElements(BlockingQueue<Object> q)161   private void testZeroElements(BlockingQueue<Object> q) throws InterruptedException {
162     for (boolean interruptibly : new boolean[] {true, false}) {
163       // asking to drain zero elements
164       assertEquals(0, drain(q, ImmutableList.of(), 0, 10, MILLISECONDS, interruptibly));
165     }
166   }
167 
testEmpty()168   public void testEmpty() throws Exception {
169     for (BlockingQueue<Object> q : blockingQueues()) {
170       testEmpty(q);
171     }
172   }
173 
testEmpty(BlockingQueue<Object> q)174   private void testEmpty(BlockingQueue<Object> q) {
175     assertDrained(q);
176   }
177 
testNegativeMaxElements()178   public void testNegativeMaxElements() throws Exception {
179     for (BlockingQueue<Object> q : blockingQueues()) {
180       testNegativeMaxElements(q);
181     }
182   }
183 
testNegativeMaxElements(BlockingQueue<Object> q)184   private void testNegativeMaxElements(BlockingQueue<Object> q) throws InterruptedException {
185     @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
186     Future<?> possiblyIgnoredError = threadPool.submit(new Producer(q, 1));
187 
188     List<Object> buf = newArrayList();
189     int elements = Queues.drain(q, buf, -1, MAX_VALUE, NANOSECONDS);
190     assertEquals(0, elements);
191     assertThat(buf).isEmpty();
192 
193     // Free the producer thread, and give subsequent tests a clean slate.
194     q.take();
195   }
196 
testDrain_throws()197   public void testDrain_throws() throws Exception {
198     for (BlockingQueue<Object> q : blockingQueues()) {
199       testDrain_throws(q);
200     }
201   }
202 
testDrain_throws(BlockingQueue<Object> q)203   private void testDrain_throws(BlockingQueue<Object> q) {
204     @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
205     Future<?> possiblyIgnoredError = threadPool.submit(new Interrupter(currentThread()));
206     try {
207       Queues.drain(q, ImmutableList.of(), 100, MAX_VALUE, NANOSECONDS);
208       fail();
209     } catch (InterruptedException expected) {
210     }
211   }
212 
testDrainUninterruptibly_doesNotThrow()213   public void testDrainUninterruptibly_doesNotThrow() throws Exception {
214     for (BlockingQueue<Object> q : blockingQueues()) {
215       testDrainUninterruptibly_doesNotThrow(q);
216     }
217   }
218 
testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q)219   private void testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q) {
220     final Thread mainThread = currentThread();
221     @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
222     Future<?> possiblyIgnoredError =
223         threadPool.submit(
224             new Callable<Void>() {
225               public Void call() throws InterruptedException {
226                 new Producer(q, 50).call();
227                 new Interrupter(mainThread).run();
228                 new Producer(q, 50).call();
229                 return null;
230               }
231             });
232     List<Object> buf = newArrayList();
233     int elements = Queues.drainUninterruptibly(q, buf, 100, MAX_VALUE, NANOSECONDS);
234     // so when this drains all elements, we know the thread has also been interrupted in between
235     assertTrue(Thread.interrupted());
236     assertEquals(100, elements);
237     assertEquals(100, buf.size());
238   }
239 
testNewLinkedBlockingDequeCapacity()240   public void testNewLinkedBlockingDequeCapacity() {
241     try {
242       Queues.newLinkedBlockingDeque(0);
243       fail("Should have thrown IllegalArgumentException");
244     } catch (IllegalArgumentException expected) {
245       // any capacity less than 1 should throw IllegalArgumentException
246     }
247     assertEquals(1, Queues.newLinkedBlockingDeque(1).remainingCapacity());
248     assertEquals(11, Queues.newLinkedBlockingDeque(11).remainingCapacity());
249   }
250 
testNewLinkedBlockingQueueCapacity()251   public void testNewLinkedBlockingQueueCapacity() {
252     try {
253       Queues.newLinkedBlockingQueue(0);
254       fail("Should have thrown IllegalArgumentException");
255     } catch (IllegalArgumentException expected) {
256       // any capacity less than 1 should throw IllegalArgumentException
257     }
258     assertEquals(1, Queues.newLinkedBlockingQueue(1).remainingCapacity());
259     assertEquals(11, Queues.newLinkedBlockingQueue(11).remainingCapacity());
260   }
261 
262   /** Checks that #drain() invocations behave correctly for a drained (empty) queue. */
assertDrained(BlockingQueue<Object> q)263   private void assertDrained(BlockingQueue<Object> q) {
264     assertNull(q.peek());
265     assertInterruptibleDrained(q);
266     assertUninterruptibleDrained(q);
267   }
268 
assertInterruptibleDrained(BlockingQueue<Object> q)269   private void assertInterruptibleDrained(BlockingQueue<Object> q) {
270     // nothing to drain, thus this should wait doing nothing
271     try {
272       assertEquals(0, Queues.drain(q, ImmutableList.of(), 0, 10, MILLISECONDS));
273     } catch (InterruptedException e) {
274       throw new AssertionError();
275     }
276 
277     // but does the wait actually occurs?
278     @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
279     Future<?> possiblyIgnoredError = threadPool.submit(new Interrupter(currentThread()));
280     try {
281       // if waiting works, this should get stuck
282       Queues.drain(q, newArrayList(), 1, MAX_VALUE, NANOSECONDS);
283       fail();
284     } catch (InterruptedException expected) {
285       // we indeed waited; a slow thread had enough time to interrupt us
286     }
287   }
288 
289   // same as above; uninterruptible version
assertUninterruptibleDrained(BlockingQueue<Object> q)290   private void assertUninterruptibleDrained(BlockingQueue<Object> q) {
291     assertEquals(0, Queues.drainUninterruptibly(q, ImmutableList.of(), 0, 10, MILLISECONDS));
292 
293     // but does the wait actually occurs?
294     @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored
295     Future<?> possiblyIgnoredError = threadPool.submit(new Interrupter(currentThread()));
296 
297     Stopwatch timer = Stopwatch.createStarted();
298     Queues.drainUninterruptibly(q, newArrayList(), 1, 10, MILLISECONDS);
299     assertThat(timer.elapsed(MILLISECONDS)).isAtLeast(10L);
300     // wait for interrupted status and clear it
301     while (!Thread.interrupted()) {
302       Thread.yield();
303     }
304   }
305 
306   private static class Producer implements Callable<Void> {
307     final BlockingQueue<Object> q;
308     final int elements;
309     final CountDownLatch beganProducing = new CountDownLatch(1);
310     final CountDownLatch doneProducing = new CountDownLatch(1);
311 
Producer(BlockingQueue<Object> q, int elements)312     Producer(BlockingQueue<Object> q, int elements) {
313       this.q = q;
314       this.elements = elements;
315     }
316 
317     @Override
call()318     public Void call() throws InterruptedException {
319       try {
320         beganProducing.countDown();
321         for (int i = 0; i < elements; i++) {
322           q.put(new Object());
323         }
324         return null;
325       } finally {
326         doneProducing.countDown();
327       }
328     }
329   }
330 
331   private static class Interrupter implements Runnable {
332     final Thread threadToInterrupt;
333 
Interrupter(Thread threadToInterrupt)334     Interrupter(Thread threadToInterrupt) {
335       this.threadToInterrupt = threadToInterrupt;
336     }
337 
338     @Override
run()339     public void run() {
340       try {
341         Thread.sleep(100);
342       } catch (InterruptedException e) {
343         throw new AssertionError();
344       } finally {
345         threadToInterrupt.interrupt();
346       }
347     }
348   }
349 }
350