1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.collect; 18 19 import static com.google.common.collect.Lists.newArrayList; 20 import static com.google.common.truth.Truth.assertThat; 21 import static java.lang.Long.MAX_VALUE; 22 import static java.lang.Thread.currentThread; 23 import static java.util.concurrent.Executors.newCachedThreadPool; 24 import static java.util.concurrent.TimeUnit.MILLISECONDS; 25 import static java.util.concurrent.TimeUnit.NANOSECONDS; 26 import static java.util.concurrent.TimeUnit.SECONDS; 27 28 import com.google.common.base.Stopwatch; 29 import java.util.Collection; 30 import java.util.List; 31 import java.util.concurrent.ArrayBlockingQueue; 32 import java.util.concurrent.BlockingQueue; 33 import java.util.concurrent.Callable; 34 import java.util.concurrent.CountDownLatch; 35 import java.util.concurrent.ExecutorService; 36 import java.util.concurrent.Future; 37 import java.util.concurrent.LinkedBlockingDeque; 38 import java.util.concurrent.LinkedBlockingQueue; 39 import java.util.concurrent.PriorityBlockingQueue; 40 import java.util.concurrent.SynchronousQueue; 41 import java.util.concurrent.TimeUnit; 42 import junit.framework.TestCase; 43 44 /** 45 * Tests for {@link Queues}. 46 * 47 * @author Dimitris Andreou 48 */ 49 50 public class QueuesTest extends TestCase { 51 /* 52 * All the following tests relate to BlockingQueue methods in Queues. 53 */ 54 blockingQueues()55 public static List<BlockingQueue<Object>> blockingQueues() { 56 return ImmutableList.<BlockingQueue<Object>>of( 57 new LinkedBlockingQueue<Object>(), 58 new LinkedBlockingQueue<Object>(10), 59 new SynchronousQueue<Object>(), 60 new ArrayBlockingQueue<Object>(10), 61 new LinkedBlockingDeque<Object>(), 62 new LinkedBlockingDeque<Object>(10), 63 new PriorityBlockingQueue<Object>(10, Ordering.arbitrary())); 64 } 65 66 /* 67 * We need to perform operations in a thread pool, even for simple cases, because the queue might 68 * be a SynchronousQueue. 69 */ 70 private ExecutorService threadPool; 71 72 @Override setUp()73 public void setUp() { 74 threadPool = newCachedThreadPool(); 75 } 76 77 @Override tearDown()78 public void tearDown() throws InterruptedException { 79 threadPool.shutdown(); 80 assertTrue("Some worker didn't finish in time", threadPool.awaitTermination(10, SECONDS)); 81 } 82 drain( BlockingQueue<T> q, Collection<? super T> buffer, int maxElements, long timeout, TimeUnit unit, boolean interruptibly)83 private static <T> int drain( 84 BlockingQueue<T> q, 85 Collection<? super T> buffer, 86 int maxElements, 87 long timeout, 88 TimeUnit unit, 89 boolean interruptibly) 90 throws InterruptedException { 91 return interruptibly 92 ? Queues.drain(q, buffer, maxElements, timeout, unit) 93 : Queues.drainUninterruptibly(q, buffer, maxElements, timeout, unit); 94 } 95 testMultipleProducers()96 public void testMultipleProducers() throws Exception { 97 for (BlockingQueue<Object> q : blockingQueues()) { 98 testMultipleProducers(q); 99 } 100 } 101 testMultipleProducers(BlockingQueue<Object> q)102 private void testMultipleProducers(BlockingQueue<Object> q) throws InterruptedException { 103 for (boolean interruptibly : new boolean[] {true, false}) { 104 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 105 Future<?> possiblyIgnoredError = threadPool.submit(new Producer(q, 20)); 106 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 107 Future<?> possiblyIgnoredError1 = threadPool.submit(new Producer(q, 20)); 108 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 109 Future<?> possiblyIgnoredError2 = threadPool.submit(new Producer(q, 20)); 110 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 111 Future<?> possiblyIgnoredError3 = threadPool.submit(new Producer(q, 20)); 112 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 113 Future<?> possiblyIgnoredError4 = threadPool.submit(new Producer(q, 20)); 114 115 List<Object> buf = newArrayList(); 116 int elements = drain(q, buf, 100, MAX_VALUE, NANOSECONDS, interruptibly); 117 assertEquals(100, elements); 118 assertEquals(100, buf.size()); 119 assertDrained(q); 120 } 121 } 122 testDrainTimesOut()123 public void testDrainTimesOut() throws Exception { 124 for (BlockingQueue<Object> q : blockingQueues()) { 125 testDrainTimesOut(q); 126 } 127 } 128 testDrainTimesOut(BlockingQueue<Object> q)129 private void testDrainTimesOut(BlockingQueue<Object> q) throws Exception { 130 for (boolean interruptibly : new boolean[] {true, false}) { 131 assertEquals(0, Queues.drain(q, ImmutableList.of(), 1, 10, MILLISECONDS)); 132 133 Producer producer = new Producer(q, 1); 134 // producing one, will ask for two 135 Future<?> producerThread = threadPool.submit(producer); 136 producer.beganProducing.await(); 137 138 // make sure we time out 139 Stopwatch timer = Stopwatch.createStarted(); 140 141 int drained = drain(q, newArrayList(), 2, 10, MILLISECONDS, interruptibly); 142 assertThat(drained).isAtMost(1); 143 144 assertThat(timer.elapsed(MILLISECONDS)).isAtLeast(10L); 145 146 // If even the first one wasn't there, clean up so that the next test doesn't see an element. 147 producerThread.cancel(true); 148 producer.doneProducing.await(); 149 if (drained == 0) { 150 q.poll(); // not necessarily there if producer was interrupted 151 } 152 } 153 } 154 testZeroElements()155 public void testZeroElements() throws Exception { 156 for (BlockingQueue<Object> q : blockingQueues()) { 157 testZeroElements(q); 158 } 159 } 160 testZeroElements(BlockingQueue<Object> q)161 private void testZeroElements(BlockingQueue<Object> q) throws InterruptedException { 162 for (boolean interruptibly : new boolean[] {true, false}) { 163 // asking to drain zero elements 164 assertEquals(0, drain(q, ImmutableList.of(), 0, 10, MILLISECONDS, interruptibly)); 165 } 166 } 167 testEmpty()168 public void testEmpty() throws Exception { 169 for (BlockingQueue<Object> q : blockingQueues()) { 170 testEmpty(q); 171 } 172 } 173 testEmpty(BlockingQueue<Object> q)174 private void testEmpty(BlockingQueue<Object> q) { 175 assertDrained(q); 176 } 177 testNegativeMaxElements()178 public void testNegativeMaxElements() throws Exception { 179 for (BlockingQueue<Object> q : blockingQueues()) { 180 testNegativeMaxElements(q); 181 } 182 } 183 testNegativeMaxElements(BlockingQueue<Object> q)184 private void testNegativeMaxElements(BlockingQueue<Object> q) throws InterruptedException { 185 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 186 Future<?> possiblyIgnoredError = threadPool.submit(new Producer(q, 1)); 187 188 List<Object> buf = newArrayList(); 189 int elements = Queues.drain(q, buf, -1, MAX_VALUE, NANOSECONDS); 190 assertEquals(0, elements); 191 assertThat(buf).isEmpty(); 192 193 // Free the producer thread, and give subsequent tests a clean slate. 194 q.take(); 195 } 196 testDrain_throws()197 public void testDrain_throws() throws Exception { 198 for (BlockingQueue<Object> q : blockingQueues()) { 199 testDrain_throws(q); 200 } 201 } 202 testDrain_throws(BlockingQueue<Object> q)203 private void testDrain_throws(BlockingQueue<Object> q) { 204 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 205 Future<?> possiblyIgnoredError = threadPool.submit(new Interrupter(currentThread())); 206 try { 207 Queues.drain(q, ImmutableList.of(), 100, MAX_VALUE, NANOSECONDS); 208 fail(); 209 } catch (InterruptedException expected) { 210 } 211 } 212 testDrainUninterruptibly_doesNotThrow()213 public void testDrainUninterruptibly_doesNotThrow() throws Exception { 214 for (BlockingQueue<Object> q : blockingQueues()) { 215 testDrainUninterruptibly_doesNotThrow(q); 216 } 217 } 218 testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q)219 private void testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q) { 220 final Thread mainThread = currentThread(); 221 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 222 Future<?> possiblyIgnoredError = 223 threadPool.submit( 224 new Callable<Void>() { 225 public Void call() throws InterruptedException { 226 new Producer(q, 50).call(); 227 new Interrupter(mainThread).run(); 228 new Producer(q, 50).call(); 229 return null; 230 } 231 }); 232 List<Object> buf = newArrayList(); 233 int elements = Queues.drainUninterruptibly(q, buf, 100, MAX_VALUE, NANOSECONDS); 234 // so when this drains all elements, we know the thread has also been interrupted in between 235 assertTrue(Thread.interrupted()); 236 assertEquals(100, elements); 237 assertEquals(100, buf.size()); 238 } 239 testNewLinkedBlockingDequeCapacity()240 public void testNewLinkedBlockingDequeCapacity() { 241 try { 242 Queues.newLinkedBlockingDeque(0); 243 fail("Should have thrown IllegalArgumentException"); 244 } catch (IllegalArgumentException expected) { 245 // any capacity less than 1 should throw IllegalArgumentException 246 } 247 assertEquals(1, Queues.newLinkedBlockingDeque(1).remainingCapacity()); 248 assertEquals(11, Queues.newLinkedBlockingDeque(11).remainingCapacity()); 249 } 250 testNewLinkedBlockingQueueCapacity()251 public void testNewLinkedBlockingQueueCapacity() { 252 try { 253 Queues.newLinkedBlockingQueue(0); 254 fail("Should have thrown IllegalArgumentException"); 255 } catch (IllegalArgumentException expected) { 256 // any capacity less than 1 should throw IllegalArgumentException 257 } 258 assertEquals(1, Queues.newLinkedBlockingQueue(1).remainingCapacity()); 259 assertEquals(11, Queues.newLinkedBlockingQueue(11).remainingCapacity()); 260 } 261 262 /** Checks that #drain() invocations behave correctly for a drained (empty) queue. */ assertDrained(BlockingQueue<Object> q)263 private void assertDrained(BlockingQueue<Object> q) { 264 assertNull(q.peek()); 265 assertInterruptibleDrained(q); 266 assertUninterruptibleDrained(q); 267 } 268 assertInterruptibleDrained(BlockingQueue<Object> q)269 private void assertInterruptibleDrained(BlockingQueue<Object> q) { 270 // nothing to drain, thus this should wait doing nothing 271 try { 272 assertEquals(0, Queues.drain(q, ImmutableList.of(), 0, 10, MILLISECONDS)); 273 } catch (InterruptedException e) { 274 throw new AssertionError(); 275 } 276 277 // but does the wait actually occurs? 278 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 279 Future<?> possiblyIgnoredError = threadPool.submit(new Interrupter(currentThread())); 280 try { 281 // if waiting works, this should get stuck 282 Queues.drain(q, newArrayList(), 1, MAX_VALUE, NANOSECONDS); 283 fail(); 284 } catch (InterruptedException expected) { 285 // we indeed waited; a slow thread had enough time to interrupt us 286 } 287 } 288 289 // same as above; uninterruptible version assertUninterruptibleDrained(BlockingQueue<Object> q)290 private void assertUninterruptibleDrained(BlockingQueue<Object> q) { 291 assertEquals(0, Queues.drainUninterruptibly(q, ImmutableList.of(), 0, 10, MILLISECONDS)); 292 293 // but does the wait actually occurs? 294 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 295 Future<?> possiblyIgnoredError = threadPool.submit(new Interrupter(currentThread())); 296 297 Stopwatch timer = Stopwatch.createStarted(); 298 Queues.drainUninterruptibly(q, newArrayList(), 1, 10, MILLISECONDS); 299 assertThat(timer.elapsed(MILLISECONDS)).isAtLeast(10L); 300 // wait for interrupted status and clear it 301 while (!Thread.interrupted()) { 302 Thread.yield(); 303 } 304 } 305 306 private static class Producer implements Callable<Void> { 307 final BlockingQueue<Object> q; 308 final int elements; 309 final CountDownLatch beganProducing = new CountDownLatch(1); 310 final CountDownLatch doneProducing = new CountDownLatch(1); 311 Producer(BlockingQueue<Object> q, int elements)312 Producer(BlockingQueue<Object> q, int elements) { 313 this.q = q; 314 this.elements = elements; 315 } 316 317 @Override call()318 public Void call() throws InterruptedException { 319 try { 320 beganProducing.countDown(); 321 for (int i = 0; i < elements; i++) { 322 q.put(new Object()); 323 } 324 return null; 325 } finally { 326 doneProducing.countDown(); 327 } 328 } 329 } 330 331 private static class Interrupter implements Runnable { 332 final Thread threadToInterrupt; 333 Interrupter(Thread threadToInterrupt)334 Interrupter(Thread threadToInterrupt) { 335 this.threadToInterrupt = threadToInterrupt; 336 } 337 338 @Override run()339 public void run() { 340 try { 341 Thread.sleep(100); 342 } catch (InterruptedException e) { 343 throw new AssertionError(); 344 } finally { 345 threadToInterrupt.interrupt(); 346 } 347 } 348 } 349 } 350