• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  */
14 
15 package com.google.common.collect;
16 
17 import com.google.common.annotations.Beta;
18 import com.google.common.base.Preconditions;
19 
20 import java.util.ArrayDeque;
21 import java.util.Collection;
22 import java.util.PriorityQueue;
23 import java.util.Queue;
24 import java.util.concurrent.ArrayBlockingQueue;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.ConcurrentLinkedQueue;
27 import java.util.concurrent.LinkedBlockingDeque;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.PriorityBlockingQueue;
30 import java.util.concurrent.SynchronousQueue;
31 import java.util.concurrent.TimeUnit;
32 
33 /**
34  * Static utility methods pertaining to {@link Queue}
35  * instances. Also see this class's counterparts
36  * {@link Lists}, {@link Sets}, and {@link Maps}.
37  *
38  * @author Kurt Alfred Kluever
39  * @since 11.0
40  */
41 @Beta
42 public final class Queues {
Queues()43   private Queues() {}
44 
45   // ArrayBlockingQueue
46 
47   /**
48    * Creates an empty {@code ArrayBlockingQueue} instance.
49    *
50    * @return a new, empty {@code ArrayBlockingQueue}
51    */
newArrayBlockingQueue(int capacity)52   public static <E> ArrayBlockingQueue<E> newArrayBlockingQueue(int capacity) {
53     return new ArrayBlockingQueue<E>(capacity);
54   }
55 
56   // ArrayDeque
57 
58   // ConcurrentLinkedQueue
59 
60   /**
61    * Creates an empty {@code ConcurrentLinkedQueue} instance.
62    *
63    * @return a new, empty {@code ConcurrentLinkedQueue}
64    */
newConcurrentLinkedQueue()65   public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue() {
66     return new ConcurrentLinkedQueue<E>();
67   }
68 
69   /**
70    * Creates an {@code ConcurrentLinkedQueue} instance containing the given elements.
71    *
72    * @param elements the elements that the queue should contain, in order
73    * @return a new {@code ConcurrentLinkedQueue} containing those elements
74    */
newConcurrentLinkedQueue( Iterable<? extends E> elements)75   public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue(
76       Iterable<? extends E> elements) {
77     if (elements instanceof Collection) {
78       return new ConcurrentLinkedQueue<E>(Collections2.cast(elements));
79     }
80     ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<E>();
81     Iterables.addAll(queue, elements);
82     return queue;
83   }
84 
85   // LinkedBlockingDeque
86 
87   // LinkedBlockingQueue
88 
89   /**
90    * Creates an empty {@code LinkedBlockingQueue} instance.
91    *
92    * @return a new, empty {@code LinkedBlockingQueue}
93    */
newLinkedBlockingQueue()94   public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue() {
95     return new LinkedBlockingQueue<E>();
96   }
97 
98   /**
99    * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
100    *
101    * @param capacity the capacity of this queue
102    * @return a new, empty {@code LinkedBlockingQueue}
103    * @throws IllegalArgumentException if {@code capacity} is less than 1
104    */
newLinkedBlockingQueue(int capacity)105   public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(int capacity) {
106     return new LinkedBlockingQueue<E>(capacity);
107   }
108 
109   /**
110    * Creates an {@code LinkedBlockingQueue} instance containing the given elements.
111    *
112    * @param elements the elements that the queue should contain, in order
113    * @return a new {@code LinkedBlockingQueue} containing those elements
114    */
newLinkedBlockingQueue(Iterable<? extends E> elements)115   public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(Iterable<? extends E> elements) {
116     if (elements instanceof Collection) {
117       return new LinkedBlockingQueue<E>(Collections2.cast(elements));
118     }
119     LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<E>();
120     Iterables.addAll(queue, elements);
121     return queue;
122   }
123 
124   // LinkedList: see {@link com.google.common.collect.Lists}
125 
126   // PriorityBlockingQueue
127 
128   /**
129    * Creates an empty {@code PriorityBlockingQueue} instance.
130    *
131    * @return a new, empty {@code PriorityBlockingQueue}
132    */
newPriorityBlockingQueue()133   public static <E> PriorityBlockingQueue<E> newPriorityBlockingQueue() {
134     return new PriorityBlockingQueue<E>();
135   }
136 
137   /**
138    * Creates an {@code PriorityBlockingQueue} instance containing the given elements.
139    *
140    * @param elements the elements that the queue should contain, in order
141    * @return a new {@code PriorityBlockingQueue} containing those elements
142    */
newPriorityBlockingQueue( Iterable<? extends E> elements)143   public static <E> PriorityBlockingQueue<E> newPriorityBlockingQueue(
144       Iterable<? extends E> elements) {
145     if (elements instanceof Collection) {
146       return new PriorityBlockingQueue<E>(Collections2.cast(elements));
147     }
148     PriorityBlockingQueue<E> queue = new PriorityBlockingQueue<E>();
149     Iterables.addAll(queue, elements);
150     return queue;
151   }
152 
153   // PriorityQueue
154 
155   /**
156    * Creates an empty {@code PriorityQueue} instance.
157    *
158    * @return a new, empty {@code PriorityQueue}
159    */
newPriorityQueue()160   public static <E> PriorityQueue<E> newPriorityQueue() {
161     return new PriorityQueue<E>();
162   }
163 
164   /**
165    * Creates an {@code PriorityQueue} instance containing the given elements.
166    *
167    * @param elements the elements that the queue should contain, in order
168    * @return a new {@code PriorityQueue} containing those elements
169    */
newPriorityQueue(Iterable<? extends E> elements)170   public static <E> PriorityQueue<E> newPriorityQueue(Iterable<? extends E> elements) {
171     if (elements instanceof Collection) {
172       return new PriorityQueue<E>(Collections2.cast(elements));
173     }
174     PriorityQueue<E> queue = new PriorityQueue<E>();
175     Iterables.addAll(queue, elements);
176     return queue;
177   }
178 
179   // SynchronousQueue
180 
181   /**
182    * Creates an empty {@code SynchronousQueue} instance.
183    *
184    * @return a new, empty {@code SynchronousQueue}
185    */
newSynchronousQueue()186   public static <E> SynchronousQueue<E> newSynchronousQueue() {
187     return new SynchronousQueue<E>();
188   }
189 
190   /**
191    * Drains the queue as {@link BlockingQueue#drainTo(Collection, int)}, but if the requested
192    * {@code numElements} elements are not available, it will wait for them up to the specified
193    * timeout.
194    *
195    * @param q the blocking queue to be drained
196    * @param buffer where to add the transferred elements
197    * @param numElements the number of elements to be waited for
198    * @param timeout how long to wait before giving up, in units of {@code unit}
199    * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
200    * @return the number of elements transferred
201    * @throws InterruptedException if interrupted while waiting
202    */
drain(BlockingQueue<E> q, Collection<? super E> buffer, int numElements, long timeout, TimeUnit unit)203   public static <E> int drain(BlockingQueue<E> q, Collection<? super E> buffer, int numElements,
204       long timeout, TimeUnit unit) throws InterruptedException {
205     Preconditions.checkNotNull(buffer);
206     /*
207      * This code performs one System.nanoTime() more than necessary, and in return, the time to
208      * execute Queue#drainTo is not added *on top* of waiting for the timeout (which could make
209      * the timeout arbitrarily inaccurate, given a queue that is slow to drain).
210      */
211     long deadline = System.nanoTime() + unit.toNanos(timeout);
212     int added = 0;
213     while (added < numElements) {
214       // we could rely solely on #poll, but #drainTo might be more efficient when there are multiple
215       // elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
216       added += q.drainTo(buffer, numElements - added);
217       if (added < numElements) { // not enough elements immediately available; will have to poll
218         E e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
219         if (e == null) {
220           break; // we already waited enough, and there are no more elements in sight
221         }
222         buffer.add(e);
223         added++;
224       }
225     }
226     return added;
227   }
228 
229   /**
230    * Drains the queue as {@linkplain #drain(BlockingQueue, Collection, int, long, TimeUnit)},
231    * but with a different behavior in case it is interrupted while waiting. In that case, the
232    * operation will continue as usual, and in the end the thread's interruption status will be set
233    * (no {@code InterruptedException} is thrown).
234    *
235    * @param q the blocking queue to be drained
236    * @param buffer where to add the transferred elements
237    * @param numElements the number of elements to be waited for
238    * @param timeout how long to wait before giving up, in units of {@code unit}
239    * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
240    * @return the number of elements transferred
241    */
drainUninterruptibly(BlockingQueue<E> q, Collection<? super E> buffer, int numElements, long timeout, TimeUnit unit)242   public static <E> int drainUninterruptibly(BlockingQueue<E> q, Collection<? super E> buffer,
243       int numElements, long timeout, TimeUnit unit) {
244     Preconditions.checkNotNull(buffer);
245     long deadline = System.nanoTime() + unit.toNanos(timeout);
246     int added = 0;
247     boolean interrupted = false;
248     try {
249       while (added < numElements) {
250         // we could rely solely on #poll, but #drainTo might be more efficient when there are
251         // multiple elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
252         added += q.drainTo(buffer, numElements - added);
253         if (added < numElements) { // not enough elements immediately available; will have to poll
254           E e; // written exactly once, by a successful (uninterrupted) invocation of #poll
255           while (true) {
256             try {
257               e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
258               break;
259             } catch (InterruptedException ex) {
260               interrupted = true; // note interruption and retry
261             }
262           }
263           if (e == null) {
264             break; // we already waited enough, and there are no more elements in sight
265           }
266           buffer.add(e);
267           added++;
268         }
269       }
270     } finally {
271       if (interrupted) {
272         Thread.currentThread().interrupt();
273       }
274     }
275     return added;
276   }
277 }
278