• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2010 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import com.google.common.collect.ObjectArrays;
20 import com.google.errorprone.annotations.CanIgnoreReturnValue;
21 import java.util.AbstractQueue;
22 import java.util.Collection;
23 import java.util.ConcurrentModificationException;
24 import java.util.Iterator;
25 import java.util.NoSuchElementException;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.TimeUnit;
28 import org.checkerframework.checker.nullness.qual.Nullable;
29 
30 /**
31  * A bounded {@linkplain BlockingQueue blocking queue} backed by an array. This queue orders
32  * elements FIFO (first-in-first-out). The <em>head</em> of the queue is that element that has been
33  * on the queue the longest time. The <em>tail</em> of the queue is that element that has been on
34  * the queue the shortest time. New elements are inserted at the tail of the queue, and the queue
35  * retrieval operations obtain elements at the head of the queue.
36  *
37  * <p>This is a classic &quot;bounded buffer&quot;, in which a fixed-sized array holds elements
38  * inserted by producers and extracted by consumers. Once created, the capacity cannot be increased.
39  * Attempts to {@code put} an element into a full queue will result in the operation blocking;
40  * attempts to {@code take} an element from an empty queue will similarly block.
41  *
42  * <p>This class supports an optional fairness policy for ordering waiting producer and consumer
43  * threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness
44  * set to {@code true} grants threads access in FIFO order. Fairness generally decreases throughput
45  * but reduces variability and avoids starvation.
46  *
47  * <p>This class and its iterator implement all of the <em>optional</em> methods of the {@link
48  * Collection} and {@link Iterator} interfaces.
49  *
50  * @author Doug Lea
51  * @author Justin T. Sampson
52  * @param <E> the type of elements held in this collection
53  */
54 // TODO(kak): consider removing some of the @CanIgnoreReturnValue annotations as appropriate
55 public class MonitorBasedArrayBlockingQueue<E> extends AbstractQueue<E>
56     implements BlockingQueue<E> {
57 
58   // Based on revision 1.58 of ArrayBlockingQueue by Doug Lea, from
59   // http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/
60 
61   /** The queued items */
62   final E[] items;
63   /** items index for next take, poll or remove */
64   int takeIndex;
65   /** items index for next put, offer, or add. */
66   int putIndex;
67   /** Number of items in the queue */
68   private int count;
69 
70   /*
71    * Concurrency control uses the classic two-condition algorithm
72    * found in any textbook.
73    */
74 
75   /** Monitor guarding all access */
76   final Monitor monitor;
77 
78   /** Guard for waiting takes */
79   private final Monitor.Guard notEmpty;
80 
81   /** Guard for waiting puts */
82   private final Monitor.Guard notFull;
83 
84   // Internal helper methods
85 
86   /** Circularly increment i. */
inc(int i)87   final int inc(int i) {
88     return (++i == items.length) ? 0 : i;
89   }
90 
91   /**
92    * Inserts element at current put position, advances, and signals. Call only when occupying
93    * monitor.
94    */
insert(E x)95   private void insert(E x) {
96     items[putIndex] = x;
97     putIndex = inc(putIndex);
98     ++count;
99   }
100 
101   /**
102    * Extracts element at current take position, advances, and signals. Call only when occupying
103    * monitor.
104    */
extract()105   private E extract() {
106     final E[] items = this.items;
107     E x = items[takeIndex];
108     items[takeIndex] = null;
109     takeIndex = inc(takeIndex);
110     --count;
111     return x;
112   }
113 
114   /**
115    * Utility for remove and iterator.remove: Delete item at position i. Call only when occupying
116    * monitor.
117    */
removeAt(int i)118   void removeAt(int i) {
119     final E[] items = this.items;
120     // if removing front item, just advance
121     if (i == takeIndex) {
122       items[takeIndex] = null;
123       takeIndex = inc(takeIndex);
124     } else {
125       // slide over all others up through putIndex.
126       for (; ; ) {
127         int nexti = inc(i);
128         if (nexti != putIndex) {
129           items[i] = items[nexti];
130           i = nexti;
131         } else {
132           items[i] = null;
133           putIndex = i;
134           break;
135         }
136       }
137     }
138     --count;
139   }
140 
141   /**
142    * Creates an {@code MonitorBasedArrayBlockingQueue} with the given (fixed) capacity and default
143    * access policy.
144    *
145    * @param capacity the capacity of this queue
146    * @throws IllegalArgumentException if {@code capacity} is less than 1
147    */
MonitorBasedArrayBlockingQueue(int capacity)148   public MonitorBasedArrayBlockingQueue(int capacity) {
149     this(capacity, false);
150   }
151 
152   /**
153    * Creates an {@code MonitorBasedArrayBlockingQueue} with the given (fixed) capacity and the
154    * specified access policy.
155    *
156    * @param capacity the capacity of this queue
157    * @param fair if {@code true} then queue accesses for threads blocked on insertion or removal,
158    *     are processed in FIFO order; if {@code false} the access order is unspecified.
159    * @throws IllegalArgumentException if {@code capacity} is less than 1
160    */
MonitorBasedArrayBlockingQueue(int capacity, boolean fair)161   public MonitorBasedArrayBlockingQueue(int capacity, boolean fair) {
162     if (capacity <= 0) throw new IllegalArgumentException();
163     this.items = newEArray(capacity);
164     monitor = new Monitor(fair);
165     notEmpty =
166         new Monitor.Guard(monitor) {
167           @Override
168           public boolean isSatisfied() {
169             return count > 0;
170           }
171         };
172     notFull =
173         new Monitor.Guard(monitor) {
174           @Override
175           public boolean isSatisfied() {
176             return count < items.length;
177           }
178         };
179   }
180 
181   /**
182    * Creates an {@code MonitorBasedArrayBlockingQueue} with the given (fixed) capacity, the
183    * specified access policy and initially containing the elements of the given collection, added in
184    * traversal order of the collection's iterator.
185    *
186    * @param capacity the capacity of this queue
187    * @param fair if {@code true} then queue accesses for threads blocked on insertion or removal,
188    *     are processed in FIFO order; if {@code false} the access order is unspecified.
189    * @param c the collection of elements to initially contain
190    * @throws IllegalArgumentException if {@code capacity} is less than {@code c.size()}, or less
191    *     than 1.
192    * @throws NullPointerException if the specified collection or any of its elements are null
193    */
MonitorBasedArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)194   public MonitorBasedArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
195     this(capacity, fair);
196     if (capacity < c.size()) throw new IllegalArgumentException();
197 
198     for (E e : c) add(e);
199   }
200 
201   @SuppressWarnings("unchecked") // please don't try this home, kids
newEArray(int capacity)202   private static <E> E[] newEArray(int capacity) {
203     return (E[]) new Object[capacity];
204   }
205 
206   /**
207    * Inserts the specified element at the tail of this queue if it is possible to do so immediately
208    * without exceeding the queue's capacity, returning {@code true} upon success and throwing an
209    * {@code IllegalStateException} if this queue is full.
210    *
211    * @param e the element to add
212    * @return {@code true} (as specified by {@link Collection#add})
213    * @throws IllegalStateException if this queue is full
214    * @throws NullPointerException if the specified element is null
215    */
216   @CanIgnoreReturnValue
217   @Override
add(E e)218   public boolean add(E e) {
219     return super.add(e);
220   }
221 
222   /**
223    * Inserts the specified element at the tail of this queue if it is possible to do so immediately
224    * without exceeding the queue's capacity, returning {@code true} upon success and {@code false}
225    * if this queue is full. This method is generally preferable to method {@link #add}, which can
226    * fail to insert an element only by throwing an exception.
227    *
228    * @throws NullPointerException if the specified element is null
229    */
230   @CanIgnoreReturnValue
231   @Override
offer(E e)232   public boolean offer(E e) {
233     if (e == null) throw new NullPointerException();
234     final Monitor monitor = this.monitor;
235     if (monitor.enterIf(notFull)) {
236       try {
237         insert(e);
238         return true;
239       } finally {
240         monitor.leave();
241       }
242     } else {
243       return false;
244     }
245   }
246 
247   /**
248    * Inserts the specified element at the tail of this queue, waiting up to the specified wait time
249    * for space to become available if the queue is full.
250    *
251    * @throws InterruptedException {@inheritDoc}
252    * @throws NullPointerException {@inheritDoc}
253    */
254   @CanIgnoreReturnValue
255   @Override
offer(E e, long timeout, TimeUnit unit)256   public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
257 
258     if (e == null) throw new NullPointerException();
259     final Monitor monitor = this.monitor;
260     if (monitor.enterWhen(notFull, timeout, unit)) {
261       try {
262         insert(e);
263         return true;
264       } finally {
265         monitor.leave();
266       }
267     } else {
268       return false;
269     }
270   }
271 
272   /**
273    * Inserts the specified element at the tail of this queue, waiting for space to become available
274    * if the queue is full.
275    *
276    * @throws InterruptedException {@inheritDoc}
277    * @throws NullPointerException {@inheritDoc}
278    */
279   @Override
put(E e)280   public void put(E e) throws InterruptedException {
281     if (e == null) throw new NullPointerException();
282     final Monitor monitor = this.monitor;
283     monitor.enterWhen(notFull);
284     try {
285       insert(e);
286     } finally {
287       monitor.leave();
288     }
289   }
290 
291   @CanIgnoreReturnValue
292   @Override
poll()293   public @Nullable E poll() {
294     final Monitor monitor = this.monitor;
295     if (monitor.enterIf(notEmpty)) {
296       try {
297         return extract();
298       } finally {
299         monitor.leave();
300       }
301     } else {
302       return null;
303     }
304   }
305 
306   @CanIgnoreReturnValue
307   @Override
poll(long timeout, TimeUnit unit)308   public @Nullable E poll(long timeout, TimeUnit unit) throws InterruptedException {
309     final Monitor monitor = this.monitor;
310     if (monitor.enterWhen(notEmpty, timeout, unit)) {
311       try {
312         return extract();
313       } finally {
314         monitor.leave();
315       }
316     } else {
317       return null;
318     }
319   }
320 
321   @CanIgnoreReturnValue
322   @Override
take()323   public E take() throws InterruptedException {
324     final Monitor monitor = this.monitor;
325     monitor.enterWhen(notEmpty);
326     try {
327       return extract();
328     } finally {
329       monitor.leave();
330     }
331   }
332 
333   @CanIgnoreReturnValue
334   @Override
peek()335   public @Nullable E peek() {
336     final Monitor monitor = this.monitor;
337     if (monitor.enterIf(notEmpty)) {
338       try {
339         return items[takeIndex];
340       } finally {
341         monitor.leave();
342       }
343     } else {
344       return null;
345     }
346   }
347 
348   // this doc comment is overridden to remove the reference to collections
349   // greater in size than Integer.MAX_VALUE
350   /**
351    * Returns the number of elements in this queue.
352    *
353    * @return the number of elements in this queue
354    */
355   @CanIgnoreReturnValue
356   @Override
size()357   public int size() {
358     final Monitor monitor = this.monitor;
359     monitor.enter();
360     try {
361       return count;
362     } finally {
363       monitor.leave();
364     }
365   }
366 
367   // this doc comment is a modified copy of the inherited doc comment,
368   // without the reference to unlimited queues.
369   /**
370    * Returns the number of additional elements that this queue can ideally (in the absence of memory
371    * or resource constraints) accept without blocking. This is always equal to the initial capacity
372    * of this queue less the current {@code size} of this queue.
373    *
374    * <p>Note that you <em>cannot</em> always tell if an attempt to insert an element will succeed by
375    * inspecting {@code remainingCapacity} because it may be the case that another thread is about to
376    * insert or remove an element.
377    */
378   @CanIgnoreReturnValue
379   @Override
remainingCapacity()380   public int remainingCapacity() {
381     final Monitor monitor = this.monitor;
382     monitor.enter();
383     try {
384       return items.length - count;
385     } finally {
386       monitor.leave();
387     }
388   }
389 
390   /**
391    * Removes a single instance of the specified element from this queue, if it is present. More
392    * formally, removes an element {@code e} such that {@code o.equals(e)}, if this queue contains
393    * one or more such elements. Returns {@code true} if this queue contained the specified element
394    * (or equivalently, if this queue changed as a result of the call).
395    *
396    * @param o element to be removed from this queue, if present
397    * @return {@code true} if this queue changed as a result of the call
398    */
399   @CanIgnoreReturnValue
400   @Override
remove(@ullable Object o)401   public boolean remove(@Nullable Object o) {
402     if (o == null) return false;
403     final E[] items = this.items;
404     final Monitor monitor = this.monitor;
405     monitor.enter();
406     try {
407       int i = takeIndex;
408       int k = 0;
409       for (; ; ) {
410         if (k++ >= count) return false;
411         if (o.equals(items[i])) {
412           removeAt(i);
413           return true;
414         }
415         i = inc(i);
416       }
417     } finally {
418       monitor.leave();
419     }
420   }
421 
422   /**
423    * Returns {@code true} if this queue contains the specified element. More formally, returns
424    * {@code true} if and only if this queue contains at least one element {@code e} such that {@code
425    * o.equals(e)}.
426    *
427    * @param o object to be checked for containment in this queue
428    * @return {@code true} if this queue contains the specified element
429    */
430   @CanIgnoreReturnValue
431   @Override
contains(@ullable Object o)432   public boolean contains(@Nullable Object o) {
433     if (o == null) return false;
434     final E[] items = this.items;
435     final Monitor monitor = this.monitor;
436     monitor.enter();
437     try {
438       int i = takeIndex;
439       int k = 0;
440       while (k++ < count) {
441         if (o.equals(items[i])) return true;
442         i = inc(i);
443       }
444       return false;
445     } finally {
446       monitor.leave();
447     }
448   }
449 
450   /**
451    * Returns an array containing all of the elements in this queue, in proper sequence.
452    *
453    * <p>The returned array will be "safe" in that no references to it are maintained by this queue.
454    * (In other words, this method must allocate a new array). The caller is thus free to modify the
455    * returned array.
456    *
457    * <p>This method acts as bridge between array-based and collection-based APIs.
458    *
459    * @return an array containing all of the elements in this queue
460    */
461   @CanIgnoreReturnValue
462   @Override
toArray()463   public Object[] toArray() {
464     final E[] items = this.items;
465     final Monitor monitor = this.monitor;
466     monitor.enter();
467     try {
468       Object[] a = new Object[count];
469       int k = 0;
470       int i = takeIndex;
471       while (k < count) {
472         a[k++] = items[i];
473         i = inc(i);
474       }
475       return a;
476     } finally {
477       monitor.leave();
478     }
479   }
480 
481   /**
482    * Returns an array containing all of the elements in this queue, in proper sequence; the runtime
483    * type of the returned array is that of the specified array. If the queue fits in the specified
484    * array, it is returned therein. Otherwise, a new array is allocated with the runtime type of the
485    * specified array and the size of this queue.
486    *
487    * <p>If this queue fits in the specified array with room to spare (i.e., the array has more
488    * elements than this queue), the element in the array immediately following the end of the queue
489    * is set to {@code null}.
490    *
491    * <p>Like the {@link #toArray()} method, this method acts as bridge between array-based and
492    * collection-based APIs. Further, this method allows precise control over the runtime type of the
493    * output array, and may, under certain circumstances, be used to save allocation costs.
494    *
495    * <p>Suppose {@code x} is a queue known to contain only strings. The following code can be used
496    * to dump the queue into a newly allocated array of {@code String}:
497    *
498    * <pre>
499    *     String[] y = x.toArray(new String[0]);</pre>
500    *
501    * <p>Note that {@code toArray(new Object[0])} is identical in function to {@code toArray()}.
502    *
503    * @param a the array into which the elements of the queue are to be stored, if it is big enough;
504    *     otherwise, a new array of the same runtime type is allocated for this purpose
505    * @return an array containing all of the elements in this queue
506    * @throws ArrayStoreException if the runtime type of the specified array is not a supertype of
507    *     the runtime type of every element in this queue
508    * @throws NullPointerException if the specified array is null
509    */
510   @CanIgnoreReturnValue
511   @Override
toArray(T[] a)512   public <T> T[] toArray(T[] a) {
513     final E[] items = this.items;
514     final Monitor monitor = this.monitor;
515     monitor.enter();
516     try {
517       if (a.length < count) a = ObjectArrays.newArray(a, count);
518 
519       int k = 0;
520       int i = takeIndex;
521       while (k < count) {
522         // This cast is not itself safe, but the following statement
523         // will fail if the runtime type of items[i] is not assignable
524         // to the runtime type of a[k++], which is all that the method
525         // contract requires (see @throws ArrayStoreException above).
526         @SuppressWarnings("unchecked")
527         T t = (T) items[i];
528         a[k++] = t;
529         i = inc(i);
530       }
531       if (a.length > count) a[count] = null;
532       return a;
533     } finally {
534       monitor.leave();
535     }
536   }
537 
538   @CanIgnoreReturnValue
539   @Override
toString()540   public String toString() {
541     final Monitor monitor = this.monitor;
542     monitor.enter();
543     try {
544       return super.toString();
545     } finally {
546       monitor.leave();
547     }
548   }
549 
550   /**
551    * Atomically removes all of the elements from this queue. The queue will be empty after this call
552    * returns.
553    */
554   @Override
clear()555   public void clear() {
556     final E[] items = this.items;
557     final Monitor monitor = this.monitor;
558     monitor.enter();
559     try {
560       int i = takeIndex;
561       int k = count;
562       while (k-- > 0) {
563         items[i] = null;
564         i = inc(i);
565       }
566       count = 0;
567       putIndex = 0;
568       takeIndex = 0;
569     } finally {
570       monitor.leave();
571     }
572   }
573 
574   /**
575    * @throws UnsupportedOperationException {@inheritDoc}
576    * @throws ClassCastException {@inheritDoc}
577    * @throws NullPointerException {@inheritDoc}
578    * @throws IllegalArgumentException {@inheritDoc}
579    */
580   @CanIgnoreReturnValue
581   @Override
drainTo(Collection<? super E> c)582   public int drainTo(Collection<? super E> c) {
583     if (c == null) throw new NullPointerException();
584     if (c == this) throw new IllegalArgumentException();
585     final E[] items = this.items;
586     final Monitor monitor = this.monitor;
587     monitor.enter();
588     try {
589       int i = takeIndex;
590       int n = 0;
591       int max = count;
592       while (n < max) {
593         c.add(items[i]);
594         items[i] = null;
595         i = inc(i);
596         ++n;
597       }
598       if (n > 0) {
599         count = 0;
600         putIndex = 0;
601         takeIndex = 0;
602       }
603       return n;
604     } finally {
605       monitor.leave();
606     }
607   }
608 
609   /**
610    * @throws UnsupportedOperationException {@inheritDoc}
611    * @throws ClassCastException {@inheritDoc}
612    * @throws NullPointerException {@inheritDoc}
613    * @throws IllegalArgumentException {@inheritDoc}
614    */
615   @CanIgnoreReturnValue
616   @Override
drainTo(Collection<? super E> c, int maxElements)617   public int drainTo(Collection<? super E> c, int maxElements) {
618     if (c == null) throw new NullPointerException();
619     if (c == this) throw new IllegalArgumentException();
620     if (maxElements <= 0) return 0;
621     final E[] items = this.items;
622     final Monitor monitor = this.monitor;
623     monitor.enter();
624     try {
625       int i = takeIndex;
626       int n = 0;
627       int max = (maxElements < count) ? maxElements : count;
628       while (n < max) {
629         c.add(items[i]);
630         items[i] = null;
631         i = inc(i);
632         ++n;
633       }
634       if (n > 0) {
635         count -= n;
636         takeIndex = i;
637       }
638       return n;
639     } finally {
640       monitor.leave();
641     }
642   }
643 
644   /**
645    * Returns an iterator over the elements in this queue in proper sequence. The returned {@code
646    * Iterator} is a "weakly consistent" iterator that will never throw {@link
647    * ConcurrentModificationException}, and guarantees to traverse elements as they existed upon
648    * construction of the iterator, and may (but is not guaranteed to) reflect any modifications
649    * subsequent to construction.
650    *
651    * @return an iterator over the elements in this queue in proper sequence
652    */
653   @CanIgnoreReturnValue
654   @Override
iterator()655   public Iterator<E> iterator() {
656     final Monitor monitor = this.monitor;
657     monitor.enter();
658     try {
659       return new Itr();
660     } finally {
661       monitor.leave();
662     }
663   }
664 
665   /** Iterator for MonitorBasedArrayBlockingQueue */
666   private class Itr implements Iterator<E> {
667     /** Index of element to be returned by next, or a negative number if no such. */
668     private int nextIndex;
669 
670     /**
671      * nextItem holds on to item fields because once we claim that an element exists in hasNext(),
672      * we must return it in the following next() call even if it was in the process of being removed
673      * when hasNext() was called.
674      */
675     private @Nullable E nextItem;
676 
677     /**
678      * Index of element returned by most recent call to next. Reset to -1 if this element is deleted
679      * by a call to remove.
680      */
681     private int lastRet;
682 
Itr()683     Itr() {
684       lastRet = -1;
685       if (count == 0) nextIndex = -1;
686       else {
687         nextIndex = takeIndex;
688         nextItem = items[takeIndex];
689       }
690     }
691 
692     @Override
hasNext()693     public boolean hasNext() {
694       /*
695        * No sync. We can return true by mistake here
696        * only if this iterator passed across threads,
697        * which we don't support anyway.
698        */
699       return nextIndex >= 0;
700     }
701 
702     /**
703      * Checks whether nextIndex is valid; if so setting nextItem. Stops iterator when either hits
704      * putIndex or sees null item.
705      */
checkNext()706     private void checkNext() {
707       if (nextIndex == putIndex) {
708         nextIndex = -1;
709         nextItem = null;
710       } else {
711         nextItem = items[nextIndex];
712         if (nextItem == null) nextIndex = -1;
713       }
714     }
715 
716     @Override
next()717     public E next() {
718       final Monitor monitor = MonitorBasedArrayBlockingQueue.this.monitor;
719       monitor.enter();
720       try {
721         if (nextIndex < 0) throw new NoSuchElementException();
722         lastRet = nextIndex;
723         E x = nextItem;
724         nextIndex = inc(nextIndex);
725         checkNext();
726         return x;
727       } finally {
728         monitor.leave();
729       }
730     }
731 
732     @Override
remove()733     public void remove() {
734       final Monitor monitor = MonitorBasedArrayBlockingQueue.this.monitor;
735       monitor.enter();
736       try {
737         int i = lastRet;
738         if (i == -1) throw new IllegalStateException();
739         lastRet = -1;
740 
741         int ti = takeIndex;
742         removeAt(i);
743         // back up cursor (reset to front if was first element)
744         nextIndex = (i == ti) ? takeIndex : i;
745         checkNext();
746       } finally {
747         monitor.leave();
748       }
749     }
750   }
751 }
752