1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.collect; 18 19 import static com.google.common.collect.Lists.newArrayList; 20 import static com.google.common.truth.Truth.assertThat; 21 import static java.lang.Long.MAX_VALUE; 22 import static java.lang.Thread.currentThread; 23 import static java.util.concurrent.Executors.newCachedThreadPool; 24 import static java.util.concurrent.TimeUnit.MILLISECONDS; 25 import static java.util.concurrent.TimeUnit.NANOSECONDS; 26 import static java.util.concurrent.TimeUnit.SECONDS; 27 import static org.junit.Assert.assertThrows; 28 29 import com.google.common.base.Stopwatch; 30 import java.util.Collection; 31 import java.util.List; 32 import java.util.concurrent.ArrayBlockingQueue; 33 import java.util.concurrent.BlockingQueue; 34 import java.util.concurrent.Callable; 35 import java.util.concurrent.CountDownLatch; 36 import java.util.concurrent.ExecutorService; 37 import java.util.concurrent.Future; 38 import java.util.concurrent.LinkedBlockingDeque; 39 import java.util.concurrent.LinkedBlockingQueue; 40 import java.util.concurrent.PriorityBlockingQueue; 41 import java.util.concurrent.SynchronousQueue; 42 import java.util.concurrent.TimeUnit; 43 import junit.framework.TestCase; 44 import org.checkerframework.checker.nullness.qual.Nullable; 45 46 /** 47 * Tests for {@link Queues}. 48 * 49 * @author Dimitris Andreou 50 */ 51 public class QueuesTest extends TestCase { 52 /* 53 * All the following tests relate to BlockingQueue methods in Queues. 54 */ 55 blockingQueues()56 public static List<BlockingQueue<Object>> blockingQueues() { 57 return ImmutableList.<BlockingQueue<Object>>of( 58 new LinkedBlockingQueue<Object>(), 59 new LinkedBlockingQueue<Object>(10), 60 new SynchronousQueue<Object>(), 61 new ArrayBlockingQueue<Object>(10), 62 new LinkedBlockingDeque<Object>(), 63 new LinkedBlockingDeque<Object>(10), 64 new PriorityBlockingQueue<Object>(10, Ordering.arbitrary())); 65 } 66 67 /* 68 * We need to perform operations in a thread pool, even for simple cases, because the queue might 69 * be a SynchronousQueue. 70 */ 71 private ExecutorService threadPool; 72 73 @Override setUp()74 public void setUp() { 75 threadPool = newCachedThreadPool(); 76 } 77 78 @Override tearDown()79 public void tearDown() throws InterruptedException { 80 threadPool.shutdown(); 81 assertTrue("Some worker didn't finish in time", threadPool.awaitTermination(10, SECONDS)); 82 } 83 drain( BlockingQueue<T> q, Collection<? super T> buffer, int maxElements, long timeout, TimeUnit unit, boolean interruptibly)84 private static <T> int drain( 85 BlockingQueue<T> q, 86 Collection<? super T> buffer, 87 int maxElements, 88 long timeout, 89 TimeUnit unit, 90 boolean interruptibly) 91 throws InterruptedException { 92 return interruptibly 93 ? Queues.drain(q, buffer, maxElements, timeout, unit) 94 : Queues.drainUninterruptibly(q, buffer, maxElements, timeout, unit); 95 } 96 testMultipleProducers()97 public void testMultipleProducers() throws Exception { 98 for (BlockingQueue<Object> q : blockingQueues()) { 99 testMultipleProducers(q); 100 } 101 } 102 testMultipleProducers(BlockingQueue<Object> q)103 private void testMultipleProducers(BlockingQueue<Object> q) throws InterruptedException { 104 for (boolean interruptibly : new boolean[] {true, false}) { 105 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 106 Future<?> possiblyIgnoredError = threadPool.submit(new Producer(q, 20)); 107 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 108 Future<?> possiblyIgnoredError1 = threadPool.submit(new Producer(q, 20)); 109 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 110 Future<?> possiblyIgnoredError2 = threadPool.submit(new Producer(q, 20)); 111 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 112 Future<?> possiblyIgnoredError3 = threadPool.submit(new Producer(q, 20)); 113 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 114 Future<?> possiblyIgnoredError4 = threadPool.submit(new Producer(q, 20)); 115 116 List<Object> buf = newArrayList(); 117 int elements = drain(q, buf, 100, MAX_VALUE, NANOSECONDS, interruptibly); 118 assertEquals(100, elements); 119 assertEquals(100, buf.size()); 120 assertDrained(q); 121 } 122 } 123 testDrainTimesOut()124 public void testDrainTimesOut() throws Exception { 125 for (BlockingQueue<Object> q : blockingQueues()) { 126 testDrainTimesOut(q); 127 } 128 } 129 testDrainTimesOut(BlockingQueue<Object> q)130 private void testDrainTimesOut(BlockingQueue<Object> q) throws Exception { 131 for (boolean interruptibly : new boolean[] {true, false}) { 132 assertEquals(0, Queues.drain(q, ImmutableList.of(), 1, 10, MILLISECONDS)); 133 134 Producer producer = new Producer(q, 1); 135 // producing one, will ask for two 136 Future<?> producerThread = threadPool.submit(producer); 137 producer.beganProducing.await(); 138 139 // make sure we time out 140 Stopwatch timer = Stopwatch.createStarted(); 141 142 int drained = drain(q, newArrayList(), 2, 10, MILLISECONDS, interruptibly); 143 assertThat(drained).isAtMost(1); 144 145 assertThat(timer.elapsed(MILLISECONDS)).isAtLeast(10L); 146 147 // If even the first one wasn't there, clean up so that the next test doesn't see an element. 148 producerThread.cancel(true); 149 producer.doneProducing.await(); 150 if (drained == 0) { 151 q.poll(); // not necessarily there if producer was interrupted 152 } 153 } 154 } 155 testZeroElements()156 public void testZeroElements() throws Exception { 157 for (BlockingQueue<Object> q : blockingQueues()) { 158 testZeroElements(q); 159 } 160 } 161 testZeroElements(BlockingQueue<Object> q)162 private void testZeroElements(BlockingQueue<Object> q) throws InterruptedException { 163 for (boolean interruptibly : new boolean[] {true, false}) { 164 // asking to drain zero elements 165 assertEquals(0, drain(q, ImmutableList.of(), 0, 10, MILLISECONDS, interruptibly)); 166 } 167 } 168 testEmpty()169 public void testEmpty() throws Exception { 170 for (BlockingQueue<Object> q : blockingQueues()) { 171 testEmpty(q); 172 } 173 } 174 testEmpty(BlockingQueue<Object> q)175 private void testEmpty(BlockingQueue<Object> q) { 176 assertDrained(q); 177 } 178 testNegativeMaxElements()179 public void testNegativeMaxElements() throws Exception { 180 for (BlockingQueue<Object> q : blockingQueues()) { 181 testNegativeMaxElements(q); 182 } 183 } 184 testNegativeMaxElements(BlockingQueue<Object> q)185 private void testNegativeMaxElements(BlockingQueue<Object> q) throws InterruptedException { 186 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 187 Future<?> possiblyIgnoredError = threadPool.submit(new Producer(q, 1)); 188 189 List<Object> buf = newArrayList(); 190 int elements = Queues.drain(q, buf, -1, MAX_VALUE, NANOSECONDS); 191 assertEquals(0, elements); 192 assertThat(buf).isEmpty(); 193 194 // Free the producer thread, and give subsequent tests a clean slate. 195 q.take(); 196 } 197 testDrain_throws()198 public void testDrain_throws() throws Exception { 199 for (BlockingQueue<Object> q : blockingQueues()) { 200 testDrain_throws(q); 201 } 202 } 203 testDrain_throws(BlockingQueue<Object> q)204 private void testDrain_throws(BlockingQueue<Object> q) { 205 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 206 Future<?> possiblyIgnoredError = threadPool.submit(new Interrupter(currentThread())); 207 try { 208 Queues.drain(q, ImmutableList.of(), 100, MAX_VALUE, NANOSECONDS); 209 fail(); 210 } catch (InterruptedException expected) { 211 } 212 } 213 testDrainUninterruptibly_doesNotThrow()214 public void testDrainUninterruptibly_doesNotThrow() throws Exception { 215 for (BlockingQueue<Object> q : blockingQueues()) { 216 testDrainUninterruptibly_doesNotThrow(q); 217 } 218 } 219 testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q)220 private void testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q) { 221 final Thread mainThread = currentThread(); 222 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 223 Future<?> possiblyIgnoredError = 224 threadPool.submit( 225 new Callable<@Nullable Void>() { 226 @Override 227 public @Nullable Void call() throws InterruptedException { 228 new Producer(q, 50).call(); 229 new Interrupter(mainThread).run(); 230 new Producer(q, 50).call(); 231 return null; 232 } 233 }); 234 List<Object> buf = newArrayList(); 235 int elements = Queues.drainUninterruptibly(q, buf, 100, MAX_VALUE, NANOSECONDS); 236 // so when this drains all elements, we know the thread has also been interrupted in between 237 assertTrue(Thread.interrupted()); 238 assertEquals(100, elements); 239 assertEquals(100, buf.size()); 240 } 241 testNewLinkedBlockingDequeCapacity()242 public void testNewLinkedBlockingDequeCapacity() { 243 assertThrows(IllegalArgumentException.class, () -> Queues.newLinkedBlockingDeque(0)); 244 assertEquals(1, Queues.newLinkedBlockingDeque(1).remainingCapacity()); 245 assertEquals(11, Queues.newLinkedBlockingDeque(11).remainingCapacity()); 246 } 247 testNewLinkedBlockingQueueCapacity()248 public void testNewLinkedBlockingQueueCapacity() { 249 assertThrows(IllegalArgumentException.class, () -> Queues.newLinkedBlockingQueue(0)); 250 assertEquals(1, Queues.newLinkedBlockingQueue(1).remainingCapacity()); 251 assertEquals(11, Queues.newLinkedBlockingQueue(11).remainingCapacity()); 252 } 253 254 /** Checks that #drain() invocations behave correctly for a drained (empty) queue. */ assertDrained(BlockingQueue<Object> q)255 private void assertDrained(BlockingQueue<Object> q) { 256 assertNull(q.peek()); 257 assertInterruptibleDrained(q); 258 assertUninterruptibleDrained(q); 259 } 260 assertInterruptibleDrained(BlockingQueue<Object> q)261 private void assertInterruptibleDrained(BlockingQueue<Object> q) { 262 // nothing to drain, thus this should wait doing nothing 263 try { 264 assertEquals(0, Queues.drain(q, ImmutableList.of(), 0, 10, MILLISECONDS)); 265 } catch (InterruptedException e) { 266 throw new AssertionError(); 267 } 268 269 // but does the wait actually occurs? 270 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 271 Future<?> possiblyIgnoredError = threadPool.submit(new Interrupter(currentThread())); 272 try { 273 // if waiting works, this should get stuck 274 Queues.drain(q, newArrayList(), 1, MAX_VALUE, NANOSECONDS); 275 fail(); 276 } catch (InterruptedException expected) { 277 // we indeed waited; a slow thread had enough time to interrupt us 278 } 279 } 280 281 // same as above; uninterruptible version assertUninterruptibleDrained(BlockingQueue<Object> q)282 private void assertUninterruptibleDrained(BlockingQueue<Object> q) { 283 assertEquals(0, Queues.drainUninterruptibly(q, ImmutableList.of(), 0, 10, MILLISECONDS)); 284 285 // but does the wait actually occurs? 286 @SuppressWarnings("unused") // https://errorprone.info/bugpattern/FutureReturnValueIgnored 287 Future<?> possiblyIgnoredError = threadPool.submit(new Interrupter(currentThread())); 288 289 Stopwatch timer = Stopwatch.createStarted(); 290 Queues.drainUninterruptibly(q, newArrayList(), 1, 10, MILLISECONDS); 291 assertThat(timer.elapsed(MILLISECONDS)).isAtLeast(10L); 292 // wait for interrupted status and clear it 293 while (!Thread.interrupted()) { 294 Thread.yield(); 295 } 296 } 297 298 private static class Producer implements Callable<@Nullable Void> { 299 final BlockingQueue<Object> q; 300 final int elements; 301 final CountDownLatch beganProducing = new CountDownLatch(1); 302 final CountDownLatch doneProducing = new CountDownLatch(1); 303 Producer(BlockingQueue<Object> q, int elements)304 Producer(BlockingQueue<Object> q, int elements) { 305 this.q = q; 306 this.elements = elements; 307 } 308 309 @Override call()310 public @Nullable Void call() throws InterruptedException { 311 try { 312 beganProducing.countDown(); 313 for (int i = 0; i < elements; i++) { 314 q.put(new Object()); 315 } 316 return null; 317 } finally { 318 doneProducing.countDown(); 319 } 320 } 321 } 322 323 private static class Interrupter implements Runnable { 324 final Thread threadToInterrupt; 325 Interrupter(Thread threadToInterrupt)326 Interrupter(Thread threadToInterrupt) { 327 this.threadToInterrupt = threadToInterrupt; 328 } 329 330 @Override run()331 public void run() { 332 try { 333 Thread.sleep(100); 334 } catch (InterruptedException e) { 335 throw new AssertionError(); 336 } finally { 337 threadToInterrupt.interrupt(); 338 } 339 } 340 } 341 } 342