• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.collect;
18 
19 import com.google.common.util.concurrent.Uninterruptibles;
20 
21 import junit.framework.TestCase;
22 
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.concurrent.ArrayBlockingQueue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.PriorityBlockingQueue;
32 import java.util.concurrent.SynchronousQueue;
33 import java.util.concurrent.TimeUnit;
34 
35 /**
36  * Tests for {@link Queues}.
37  *
38  * @author Dimitris Andreou
39  */
40 
41 public class QueuesTest extends TestCase {
42   /*
43    * All the following tests relate to BlockingQueue methods in Queues.
44    */
45 
blockingQueues()46   public static List<BlockingQueue<Object>> blockingQueues() {
47     return ImmutableList.<BlockingQueue<Object>>of(
48         new LinkedBlockingQueue<Object>(),
49         new LinkedBlockingQueue<Object>(10),
50         new SynchronousQueue<Object>(),
51         new ArrayBlockingQueue<Object>(10),
52         new PriorityBlockingQueue<Object>(10, Ordering.arbitrary()));
53   }
54 
55   private ExecutorService threadPool;
56 
57   @Override
setUp()58   public void setUp() {
59     threadPool = Executors.newCachedThreadPool();
60   }
61 
62   @Override
tearDown()63   public void tearDown() throws InterruptedException {
64     // notice that if a Producer is interrupted (a bug), the Producer will go into an infinite
65     // loop, which will be noticed here
66     threadPool.shutdown();
67     assertTrue("Some worker didn't finish in time",
68         threadPool.awaitTermination(1, TimeUnit.SECONDS));
69   }
70 
drain(BlockingQueue<T> q, Collection<? super T> buffer, int maxElements, long timeout, TimeUnit unit, boolean interruptibly)71   private static <T> int drain(BlockingQueue<T> q, Collection<? super T> buffer, int maxElements,
72       long timeout, TimeUnit unit, boolean interruptibly) throws InterruptedException {
73     return interruptibly
74         ? Queues.drain(q, buffer, maxElements, timeout, unit)
75         : Queues.drainUninterruptibly(q, buffer, maxElements, timeout, unit);
76   }
77 
testMultipleProducers()78   public void testMultipleProducers() throws Exception {
79     for (BlockingQueue<Object> q : blockingQueues()) {
80       testMultipleProducers(q);
81     }
82   }
83 
testMultipleProducers(BlockingQueue<Object> q)84   private void testMultipleProducers(BlockingQueue<Object> q)
85       throws InterruptedException {
86     for (boolean interruptibly : new boolean[] { true, false }) {
87       threadPool.submit(new Producer(q, 20));
88       threadPool.submit(new Producer(q, 20));
89       threadPool.submit(new Producer(q, 20));
90       threadPool.submit(new Producer(q, 20));
91       threadPool.submit(new Producer(q, 20));
92 
93       List<Object> buf = Lists.newArrayList();
94       int elements = drain(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS, interruptibly);
95       assertEquals(100, elements);
96       assertEquals(100, buf.size());
97       assertDrained(q);
98     }
99   }
100 
testDrainTimesOut()101   public void testDrainTimesOut() throws Exception {
102     for (BlockingQueue<Object> q : blockingQueues()) {
103       testDrainTimesOut(q);
104     }
105   }
106 
testDrainTimesOut(BlockingQueue<Object> q)107   private void testDrainTimesOut(BlockingQueue<Object> q) throws Exception {
108     for (boolean interruptibly : new boolean[] { true, false }) {
109       assertEquals(0, Queues.drain(q, ImmutableList.of(), 1, 10, TimeUnit.MILLISECONDS));
110 
111       // producing one, will ask for two
112       Future<?> submitter = threadPool.submit(new Producer(q, 1));
113 
114       // make sure we time out
115       long startTime = System.nanoTime();
116 
117       int drained = drain(q, Lists.newArrayList(), 2, 10, TimeUnit.MILLISECONDS, interruptibly);
118       assertTrue(drained <= 1);
119 
120       assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10));
121 
122       // If even the first one wasn't there, clean up so that the next test doesn't see an element.
123       submitter.get();
124       if (drained == 0) {
125         assertNotNull(q.poll());
126       }
127     }
128   }
129 
testZeroElements()130   public void testZeroElements() throws Exception {
131     for (BlockingQueue<Object> q : blockingQueues()) {
132       testZeroElements(q);
133     }
134   }
135 
testZeroElements(BlockingQueue<Object> q)136   private void testZeroElements(BlockingQueue<Object> q) throws InterruptedException {
137     for (boolean interruptibly : new boolean[] { true, false }) {
138       // asking to drain zero elements
139       assertEquals(0, drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS, interruptibly));
140     }
141   }
142 
testEmpty()143   public void testEmpty() throws Exception {
144     for (BlockingQueue<Object> q : blockingQueues()) {
145       testEmpty(q);
146     }
147   }
148 
testEmpty(BlockingQueue<Object> q)149   private void testEmpty(BlockingQueue<Object> q) {
150     assertDrained(q);
151   }
152 
testNegativeMaxElements()153   public void testNegativeMaxElements() throws Exception {
154     for (BlockingQueue<Object> q : blockingQueues()) {
155       testNegativeMaxElements(q);
156     }
157   }
158 
testNegativeMaxElements(BlockingQueue<Object> q)159   private void testNegativeMaxElements(BlockingQueue<Object> q) throws InterruptedException {
160     threadPool.submit(new Producer(q, 1));
161 
162     List<Object> buf = Lists.newArrayList();
163     int elements = Queues.drain(q, buf, -1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
164     assertEquals(elements, 0);
165     assertTrue(buf.isEmpty());
166 
167     // Clean up produced element to free the producer thread, otherwise it will complain
168     // when we shutdown the threadpool.
169     Queues.drain(q, buf, 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
170   }
171 
testDrain_throws()172   public void testDrain_throws() throws Exception {
173     for (BlockingQueue<Object> q : blockingQueues()) {
174       testDrain_throws(q);
175     }
176   }
177 
testDrain_throws(BlockingQueue<Object> q)178   private void testDrain_throws(BlockingQueue<Object> q) {
179     threadPool.submit(new Interrupter(Thread.currentThread()));
180     try {
181       Queues.drain(q, ImmutableList.of(), 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
182       fail();
183     } catch (InterruptedException expected) {
184     }
185   }
186 
testDrainUninterruptibly_doesNotThrow()187   public void testDrainUninterruptibly_doesNotThrow() throws Exception {
188     for (BlockingQueue<Object> q : blockingQueues()) {
189       testDrainUninterruptibly_doesNotThrow(q);
190     }
191   }
192 
testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q)193   private void testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q) {
194     final Thread mainThread = Thread.currentThread();
195     threadPool.submit(new Runnable() {
196       public void run() {
197         new Producer(q, 50).run();
198         new Interrupter(mainThread).run();
199         new Producer(q, 50).run();
200       }
201     });
202     List<Object> buf = Lists.newArrayList();
203     int elements =
204         Queues.drainUninterruptibly(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
205     // so when this drains all elements, we know the thread has also been interrupted in between
206     assertTrue(Thread.interrupted());
207     assertEquals(100, elements);
208     assertEquals(100, buf.size());
209   }
210 
testNewLinkedBlockingQueueCapacity()211   public void testNewLinkedBlockingQueueCapacity() {
212     try {
213       Queues.newLinkedBlockingQueue(0);
214       fail("Should have thrown IllegalArgumentException");
215     } catch (IllegalArgumentException expected) {
216       // any capacity less than 1 should throw IllegalArgumentException
217     }
218     assertEquals(1, Queues.newLinkedBlockingQueue(1).remainingCapacity());
219     assertEquals(11, Queues.newLinkedBlockingQueue(11).remainingCapacity());
220   }
221 
222   /**
223    * Checks that #drain() invocations behave correctly for a drained (empty) queue.
224    */
assertDrained(BlockingQueue<Object> q)225   private void assertDrained(BlockingQueue<Object> q) {
226     assertNull(q.peek());
227     assertInterruptibleDrained(q);
228     assertUninterruptibleDrained(q);
229   }
230 
assertInterruptibleDrained(BlockingQueue<Object> q)231   private void assertInterruptibleDrained(BlockingQueue<Object> q) {
232     // nothing to drain, thus this should wait doing nothing
233     try {
234       assertEquals(0, Queues.drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS));
235     } catch (InterruptedException e) {
236       throw new AssertionError();
237     }
238 
239     // but does the wait actually occurs?
240     threadPool.submit(new Interrupter(Thread.currentThread()));
241     try {
242       // if waiting works, this should get stuck
243       Queues.drain(q, Lists.newArrayList(), 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
244       fail();
245     } catch (InterruptedException expected) {
246       // we indeed waited; a slow thread had enough time to interrupt us
247     }
248   }
249 
250   // same as above; uninterruptible version
assertUninterruptibleDrained(BlockingQueue<Object> q)251   private void assertUninterruptibleDrained(BlockingQueue<Object> q) {
252     assertEquals(0,
253         Queues.drainUninterruptibly(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS));
254 
255     // but does the wait actually occurs?
256     threadPool.submit(new Interrupter(Thread.currentThread()));
257 
258     long startTime = System.nanoTime();
259     Queues.drainUninterruptibly(
260         q, Lists.newArrayList(), 1, 10, TimeUnit.MILLISECONDS);
261     assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10));
262     // wait for interrupted status and clear it
263     while (!Thread.interrupted()) { Thread.yield(); }
264   }
265 
266   private static class Producer implements Runnable {
267     final BlockingQueue<Object> q;
268     final int elements;
269 
Producer(BlockingQueue<Object> q, int elements)270     Producer(BlockingQueue<Object> q, int elements) {
271       this.q = q;
272       this.elements = elements;
273     }
274 
run()275     @Override public void run() {
276       try {
277         for (int i = 0; i < elements; i++) {
278           q.put(new Object());
279         }
280       } catch (InterruptedException e) {
281         // TODO(user): replace this when there is a better way to spawn threads in tests and
282         // have threads propagate their errors back to the test thread.
283         e.printStackTrace();
284         // never returns, so that #tearDown() notices that one worker isn't done
285         Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
286       }
287     }
288   }
289 
290   private static class Interrupter implements Runnable {
291     final Thread threadToInterrupt;
292 
Interrupter(Thread threadToInterrupt)293     Interrupter(Thread threadToInterrupt) {
294       this.threadToInterrupt = threadToInterrupt;
295     }
296 
run()297     @Override public void run() {
298       try {
299         Thread.sleep(100);
300       } catch (InterruptedException e) {
301         throw new AssertionError();
302       } finally {
303         threadToInterrupt.interrupt();
304       }
305     }
306   }
307 }
308