• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.util.AbstractQueue;
39 import java.util.Arrays;
40 import java.util.Collection;
41 import java.util.Comparator;
42 import java.util.Iterator;
43 import java.util.NoSuchElementException;
44 import java.util.PriorityQueue;
45 import java.util.Queue;
46 import java.util.SortedSet;
47 import java.util.Spliterator;
48 import java.util.concurrent.locks.Condition;
49 import java.util.concurrent.locks.ReentrantLock;
50 import java.util.function.Consumer;
51 
52 // BEGIN android-note
53 // removed link to collections framework docs
54 // END android-note
55 
56 /**
57  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
58  * the same ordering rules as class {@link PriorityQueue} and supplies
59  * blocking retrieval operations.  While this queue is logically
60  * unbounded, attempted additions may fail due to resource exhaustion
61  * (causing {@code OutOfMemoryError}). This class does not permit
62  * {@code null} elements.  A priority queue relying on {@linkplain
63  * Comparable natural ordering} also does not permit insertion of
64  * non-comparable objects (doing so results in
65  * {@code ClassCastException}).
66  *
67  * <p>This class and its iterator implement all of the
68  * <em>optional</em> methods of the {@link Collection} and {@link
69  * Iterator} interfaces.  The Iterator provided in method {@link
70  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
71  * the PriorityBlockingQueue in any particular order. If you need
72  * ordered traversal, consider using
73  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}
74  * can be used to <em>remove</em> some or all elements in priority
75  * order and place them in another collection.
76  *
77  * <p>Operations on this class make no guarantees about the ordering
78  * of elements with equal priority. If you need to enforce an
79  * ordering, you can define custom classes or comparators that use a
80  * secondary key to break ties in primary priority values.  For
81  * example, here is a class that applies first-in-first-out
82  * tie-breaking to comparable elements. To use it, you would insert a
83  * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
84  *
85  * <pre> {@code
86  * class FIFOEntry<E extends Comparable<? super E>>
87  *     implements Comparable<FIFOEntry<E>> {
88  *   static final AtomicLong seq = new AtomicLong(0);
89  *   final long seqNum;
90  *   final E entry;
91  *   public FIFOEntry(E entry) {
92  *     seqNum = seq.getAndIncrement();
93  *     this.entry = entry;
94  *   }
95  *   public E getEntry() { return entry; }
96  *   public int compareTo(FIFOEntry<E> other) {
97  *     int res = entry.compareTo(other.entry);
98  *     if (res == 0 && other.entry != this.entry)
99  *       res = (seqNum < other.seqNum ? -1 : 1);
100  *     return res;
101  *   }
102  * }}</pre>
103  *
104  * @since 1.5
105  * @author Doug Lea
106  * @param <E> the type of elements held in this queue
107  */
108 @SuppressWarnings("unchecked")
109 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
110     implements BlockingQueue<E>, java.io.Serializable {
111     private static final long serialVersionUID = 5595510919245408276L;
112 
113     /*
114      * The implementation uses an array-based binary heap, with public
115      * operations protected with a single lock. However, allocation
116      * during resizing uses a simple spinlock (used only while not
117      * holding main lock) in order to allow takes to operate
118      * concurrently with allocation.  This avoids repeated
119      * postponement of waiting consumers and consequent element
120      * build-up. The need to back away from lock during allocation
121      * makes it impossible to simply wrap delegated
122      * java.util.PriorityQueue operations within a lock, as was done
123      * in a previous version of this class. To maintain
124      * interoperability, a plain PriorityQueue is still used during
125      * serialization, which maintains compatibility at the expense of
126      * transiently doubling overhead.
127      */
128 
129     /**
130      * Default array capacity.
131      */
132     private static final int DEFAULT_INITIAL_CAPACITY = 11;
133 
134     /**
135      * The maximum size of array to allocate.
136      * Some VMs reserve some header words in an array.
137      * Attempts to allocate larger arrays may result in
138      * OutOfMemoryError: Requested array size exceeds VM limit
139      */
140     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
141 
142     /**
143      * Priority queue represented as a balanced binary heap: the two
144      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
145      * priority queue is ordered by comparator, or by the elements'
146      * natural ordering, if comparator is null: For each node n in the
147      * heap and each descendant d of n, n <= d.  The element with the
148      * lowest value is in queue[0], assuming the queue is nonempty.
149      */
150     private transient Object[] queue;
151 
152     /**
153      * The number of elements in the priority queue.
154      */
155     private transient int size;
156 
157     /**
158      * The comparator, or null if priority queue uses elements'
159      * natural ordering.
160      */
161     private transient Comparator<? super E> comparator;
162 
163     /**
164      * Lock used for all public operations.
165      */
166     private final ReentrantLock lock;
167 
168     /**
169      * Condition for blocking when empty.
170      */
171     private final Condition notEmpty;
172 
173     /**
174      * Spinlock for allocation, acquired via CAS.
175      */
176     private transient volatile int allocationSpinLock;
177 
178     /**
179      * A plain PriorityQueue used only for serialization,
180      * to maintain compatibility with previous versions
181      * of this class. Non-null only during serialization/deserialization.
182      */
183     private PriorityQueue<E> q;
184 
185     /**
186      * Creates a {@code PriorityBlockingQueue} with the default
187      * initial capacity (11) that orders its elements according to
188      * their {@linkplain Comparable natural ordering}.
189      */
PriorityBlockingQueue()190     public PriorityBlockingQueue() {
191         this(DEFAULT_INITIAL_CAPACITY, null);
192     }
193 
194     /**
195      * Creates a {@code PriorityBlockingQueue} with the specified
196      * initial capacity that orders its elements according to their
197      * {@linkplain Comparable natural ordering}.
198      *
199      * @param initialCapacity the initial capacity for this priority queue
200      * @throws IllegalArgumentException if {@code initialCapacity} is less
201      *         than 1
202      */
PriorityBlockingQueue(int initialCapacity)203     public PriorityBlockingQueue(int initialCapacity) {
204         this(initialCapacity, null);
205     }
206 
207     /**
208      * Creates a {@code PriorityBlockingQueue} with the specified initial
209      * capacity that orders its elements according to the specified
210      * comparator.
211      *
212      * @param initialCapacity the initial capacity for this priority queue
213      * @param  comparator the comparator that will be used to order this
214      *         priority queue.  If {@code null}, the {@linkplain Comparable
215      *         natural ordering} of the elements will be used.
216      * @throws IllegalArgumentException if {@code initialCapacity} is less
217      *         than 1
218      */
PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)219     public PriorityBlockingQueue(int initialCapacity,
220                                  Comparator<? super E> comparator) {
221         if (initialCapacity < 1)
222             throw new IllegalArgumentException();
223         this.lock = new ReentrantLock();
224         this.notEmpty = lock.newCondition();
225         this.comparator = comparator;
226         this.queue = new Object[initialCapacity];
227     }
228 
229     /**
230      * Creates a {@code PriorityBlockingQueue} containing the elements
231      * in the specified collection.  If the specified collection is a
232      * {@link SortedSet} or a {@link PriorityQueue}, this
233      * priority queue will be ordered according to the same ordering.
234      * Otherwise, this priority queue will be ordered according to the
235      * {@linkplain Comparable natural ordering} of its elements.
236      *
237      * @param  c the collection whose elements are to be placed
238      *         into this priority queue
239      * @throws ClassCastException if elements of the specified collection
240      *         cannot be compared to one another according to the priority
241      *         queue's ordering
242      * @throws NullPointerException if the specified collection or any
243      *         of its elements are null
244      */
PriorityBlockingQueue(Collection<? extends E> c)245     public PriorityBlockingQueue(Collection<? extends E> c) {
246         this.lock = new ReentrantLock();
247         this.notEmpty = lock.newCondition();
248         boolean heapify = true; // true if not known to be in heap order
249         boolean screen = true;  // true if must screen for nulls
250         if (c instanceof SortedSet<?>) {
251             SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
252             this.comparator = (Comparator<? super E>) ss.comparator();
253             heapify = false;
254         }
255         else if (c instanceof PriorityBlockingQueue<?>) {
256             PriorityBlockingQueue<? extends E> pq =
257                 (PriorityBlockingQueue<? extends E>) c;
258             this.comparator = (Comparator<? super E>) pq.comparator();
259             screen = false;
260             if (pq.getClass() == PriorityBlockingQueue.class) // exact match
261                 heapify = false;
262         }
263         Object[] a = c.toArray();
264         int n = a.length;
265         // If c.toArray incorrectly doesn't return Object[], copy it.
266         if (a.getClass() != Object[].class)
267             a = Arrays.copyOf(a, n, Object[].class);
268         if (screen && (n == 1 || this.comparator != null)) {
269             for (int i = 0; i < n; ++i)
270                 if (a[i] == null)
271                     throw new NullPointerException();
272         }
273         this.queue = a;
274         this.size = n;
275         if (heapify)
276             heapify();
277     }
278 
279     /**
280      * Tries to grow array to accommodate at least one more element
281      * (but normally expand by about 50%), giving up (allowing retry)
282      * on contention (which we expect to be rare). Call only while
283      * holding lock.
284      *
285      * @param array the heap array
286      * @param oldCap the length of the array
287      */
tryGrow(Object[] array, int oldCap)288     private void tryGrow(Object[] array, int oldCap) {
289         lock.unlock(); // must release and then re-acquire main lock
290         Object[] newArray = null;
291         if (allocationSpinLock == 0 &&
292             U.compareAndSwapInt(this, ALLOCATIONSPINLOCK, 0, 1)) {
293             try {
294                 int newCap = oldCap + ((oldCap < 64) ?
295                                        (oldCap + 2) : // grow faster if small
296                                        (oldCap >> 1));
297                 if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
298                     int minCap = oldCap + 1;
299                     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
300                         throw new OutOfMemoryError();
301                     newCap = MAX_ARRAY_SIZE;
302                 }
303                 if (newCap > oldCap && queue == array)
304                     newArray = new Object[newCap];
305             } finally {
306                 allocationSpinLock = 0;
307             }
308         }
309         if (newArray == null) // back off if another thread is allocating
310             Thread.yield();
311         lock.lock();
312         if (newArray != null && queue == array) {
313             queue = newArray;
314             System.arraycopy(array, 0, newArray, 0, oldCap);
315         }
316     }
317 
318     /**
319      * Mechanics for poll().  Call only while holding lock.
320      */
dequeue()321     private E dequeue() {
322         int n = size - 1;
323         if (n < 0)
324             return null;
325         else {
326             Object[] array = queue;
327             E result = (E) array[0];
328             E x = (E) array[n];
329             array[n] = null;
330             Comparator<? super E> cmp = comparator;
331             if (cmp == null)
332                 siftDownComparable(0, x, array, n);
333             else
334                 siftDownUsingComparator(0, x, array, n, cmp);
335             size = n;
336             return result;
337         }
338     }
339 
340     /**
341      * Inserts item x at position k, maintaining heap invariant by
342      * promoting x up the tree until it is greater than or equal to
343      * its parent, or is the root.
344      *
345      * To simplify and speed up coercions and comparisons. the
346      * Comparable and Comparator versions are separated into different
347      * methods that are otherwise identical. (Similarly for siftDown.)
348      * These methods are static, with heap state as arguments, to
349      * simplify use in light of possible comparator exceptions.
350      *
351      * @param k the position to fill
352      * @param x the item to insert
353      * @param array the heap array
354      */
siftUpComparable(int k, T x, Object[] array)355     private static <T> void siftUpComparable(int k, T x, Object[] array) {
356         Comparable<? super T> key = (Comparable<? super T>) x;
357         while (k > 0) {
358             int parent = (k - 1) >>> 1;
359             Object e = array[parent];
360             if (key.compareTo((T) e) >= 0)
361                 break;
362             array[k] = e;
363             k = parent;
364         }
365         array[k] = key;
366     }
367 
siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp)368     private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
369                                        Comparator<? super T> cmp) {
370         while (k > 0) {
371             int parent = (k - 1) >>> 1;
372             Object e = array[parent];
373             if (cmp.compare(x, (T) e) >= 0)
374                 break;
375             array[k] = e;
376             k = parent;
377         }
378         array[k] = x;
379     }
380 
381     /**
382      * Inserts item x at position k, maintaining heap invariant by
383      * demoting x down the tree repeatedly until it is less than or
384      * equal to its children or is a leaf.
385      *
386      * @param k the position to fill
387      * @param x the item to insert
388      * @param array the heap array
389      * @param n heap size
390      */
siftDownComparable(int k, T x, Object[] array, int n)391     private static <T> void siftDownComparable(int k, T x, Object[] array,
392                                                int n) {
393         if (n > 0) {
394             Comparable<? super T> key = (Comparable<? super T>)x;
395             int half = n >>> 1;           // loop while a non-leaf
396             while (k < half) {
397                 int child = (k << 1) + 1; // assume left child is least
398                 Object c = array[child];
399                 int right = child + 1;
400                 if (right < n &&
401                     ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
402                     c = array[child = right];
403                 if (key.compareTo((T) c) <= 0)
404                     break;
405                 array[k] = c;
406                 k = child;
407             }
408             array[k] = key;
409         }
410     }
411 
siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp)412     private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
413                                                     int n,
414                                                     Comparator<? super T> cmp) {
415         if (n > 0) {
416             int half = n >>> 1;
417             while (k < half) {
418                 int child = (k << 1) + 1;
419                 Object c = array[child];
420                 int right = child + 1;
421                 if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
422                     c = array[child = right];
423                 if (cmp.compare(x, (T) c) <= 0)
424                     break;
425                 array[k] = c;
426                 k = child;
427             }
428             array[k] = x;
429         }
430     }
431 
432     /**
433      * Establishes the heap invariant (described above) in the entire tree,
434      * assuming nothing about the order of the elements prior to the call.
435      */
heapify()436     private void heapify() {
437         Object[] array = queue;
438         int n = size;
439         int half = (n >>> 1) - 1;
440         Comparator<? super E> cmp = comparator;
441         if (cmp == null) {
442             for (int i = half; i >= 0; i--)
443                 siftDownComparable(i, (E) array[i], array, n);
444         }
445         else {
446             for (int i = half; i >= 0; i--)
447                 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
448         }
449     }
450 
451     /**
452      * Inserts the specified element into this priority queue.
453      *
454      * @param e the element to add
455      * @return {@code true} (as specified by {@link Collection#add})
456      * @throws ClassCastException if the specified element cannot be compared
457      *         with elements currently in the priority queue according to the
458      *         priority queue's ordering
459      * @throws NullPointerException if the specified element is null
460      */
add(E e)461     public boolean add(E e) {
462         return offer(e);
463     }
464 
465     /**
466      * Inserts the specified element into this priority queue.
467      * As the queue is unbounded, this method will never return {@code false}.
468      *
469      * @param e the element to add
470      * @return {@code true} (as specified by {@link Queue#offer})
471      * @throws ClassCastException if the specified element cannot be compared
472      *         with elements currently in the priority queue according to the
473      *         priority queue's ordering
474      * @throws NullPointerException if the specified element is null
475      */
offer(E e)476     public boolean offer(E e) {
477         if (e == null)
478             throw new NullPointerException();
479         final ReentrantLock lock = this.lock;
480         lock.lock();
481         int n, cap;
482         Object[] array;
483         while ((n = size) >= (cap = (array = queue).length))
484             tryGrow(array, cap);
485         try {
486             Comparator<? super E> cmp = comparator;
487             if (cmp == null)
488                 siftUpComparable(n, e, array);
489             else
490                 siftUpUsingComparator(n, e, array, cmp);
491             size = n + 1;
492             notEmpty.signal();
493         } finally {
494             lock.unlock();
495         }
496         return true;
497     }
498 
499     /**
500      * Inserts the specified element into this priority queue.
501      * As the queue is unbounded, this method will never block.
502      *
503      * @param e the element to add
504      * @throws ClassCastException if the specified element cannot be compared
505      *         with elements currently in the priority queue according to the
506      *         priority queue's ordering
507      * @throws NullPointerException if the specified element is null
508      */
put(E e)509     public void put(E e) {
510         offer(e); // never need to block
511     }
512 
513     /**
514      * Inserts the specified element into this priority queue.
515      * As the queue is unbounded, this method will never block or
516      * return {@code false}.
517      *
518      * @param e the element to add
519      * @param timeout This parameter is ignored as the method never blocks
520      * @param unit This parameter is ignored as the method never blocks
521      * @return {@code true} (as specified by
522      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
523      * @throws ClassCastException if the specified element cannot be compared
524      *         with elements currently in the priority queue according to the
525      *         priority queue's ordering
526      * @throws NullPointerException if the specified element is null
527      */
offer(E e, long timeout, TimeUnit unit)528     public boolean offer(E e, long timeout, TimeUnit unit) {
529         return offer(e); // never need to block
530     }
531 
poll()532     public E poll() {
533         final ReentrantLock lock = this.lock;
534         lock.lock();
535         try {
536             return dequeue();
537         } finally {
538             lock.unlock();
539         }
540     }
541 
take()542     public E take() throws InterruptedException {
543         final ReentrantLock lock = this.lock;
544         lock.lockInterruptibly();
545         E result;
546         try {
547             while ( (result = dequeue()) == null)
548                 notEmpty.await();
549         } finally {
550             lock.unlock();
551         }
552         return result;
553     }
554 
poll(long timeout, TimeUnit unit)555     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
556         long nanos = unit.toNanos(timeout);
557         final ReentrantLock lock = this.lock;
558         lock.lockInterruptibly();
559         E result;
560         try {
561             while ( (result = dequeue()) == null && nanos > 0)
562                 nanos = notEmpty.awaitNanos(nanos);
563         } finally {
564             lock.unlock();
565         }
566         return result;
567     }
568 
peek()569     public E peek() {
570         final ReentrantLock lock = this.lock;
571         lock.lock();
572         try {
573             return (size == 0) ? null : (E) queue[0];
574         } finally {
575             lock.unlock();
576         }
577     }
578 
579     /**
580      * Returns the comparator used to order the elements in this queue,
581      * or {@code null} if this queue uses the {@linkplain Comparable
582      * natural ordering} of its elements.
583      *
584      * @return the comparator used to order the elements in this queue,
585      *         or {@code null} if this queue uses the natural
586      *         ordering of its elements
587      */
comparator()588     public Comparator<? super E> comparator() {
589         return comparator;
590     }
591 
size()592     public int size() {
593         final ReentrantLock lock = this.lock;
594         lock.lock();
595         try {
596             return size;
597         } finally {
598             lock.unlock();
599         }
600     }
601 
602     /**
603      * Always returns {@code Integer.MAX_VALUE} because
604      * a {@code PriorityBlockingQueue} is not capacity constrained.
605      * @return {@code Integer.MAX_VALUE} always
606      */
remainingCapacity()607     public int remainingCapacity() {
608         return Integer.MAX_VALUE;
609     }
610 
indexOf(Object o)611     private int indexOf(Object o) {
612         if (o != null) {
613             Object[] array = queue;
614             int n = size;
615             for (int i = 0; i < n; i++)
616                 if (o.equals(array[i]))
617                     return i;
618         }
619         return -1;
620     }
621 
622     /**
623      * Removes the ith element from queue.
624      */
removeAt(int i)625     private void removeAt(int i) {
626         Object[] array = queue;
627         int n = size - 1;
628         if (n == i) // removed last element
629             array[i] = null;
630         else {
631             E moved = (E) array[n];
632             array[n] = null;
633             Comparator<? super E> cmp = comparator;
634             if (cmp == null)
635                 siftDownComparable(i, moved, array, n);
636             else
637                 siftDownUsingComparator(i, moved, array, n, cmp);
638             if (array[i] == moved) {
639                 if (cmp == null)
640                     siftUpComparable(i, moved, array);
641                 else
642                     siftUpUsingComparator(i, moved, array, cmp);
643             }
644         }
645         size = n;
646     }
647 
648     /**
649      * Removes a single instance of the specified element from this queue,
650      * if it is present.  More formally, removes an element {@code e} such
651      * that {@code o.equals(e)}, if this queue contains one or more such
652      * elements.  Returns {@code true} if and only if this queue contained
653      * the specified element (or equivalently, if this queue changed as a
654      * result of the call).
655      *
656      * @param o element to be removed from this queue, if present
657      * @return {@code true} if this queue changed as a result of the call
658      */
remove(Object o)659     public boolean remove(Object o) {
660         final ReentrantLock lock = this.lock;
661         lock.lock();
662         try {
663             int i = indexOf(o);
664             if (i == -1)
665                 return false;
666             removeAt(i);
667             return true;
668         } finally {
669             lock.unlock();
670         }
671     }
672 
673     /**
674      * Identity-based version for use in Itr.remove.
675      */
removeEQ(Object o)676     void removeEQ(Object o) {
677         final ReentrantLock lock = this.lock;
678         lock.lock();
679         try {
680             Object[] array = queue;
681             for (int i = 0, n = size; i < n; i++) {
682                 if (o == array[i]) {
683                     removeAt(i);
684                     break;
685                 }
686             }
687         } finally {
688             lock.unlock();
689         }
690     }
691 
692     /**
693      * Returns {@code true} if this queue contains the specified element.
694      * More formally, returns {@code true} if and only if this queue contains
695      * at least one element {@code e} such that {@code o.equals(e)}.
696      *
697      * @param o object to be checked for containment in this queue
698      * @return {@code true} if this queue contains the specified element
699      */
contains(Object o)700     public boolean contains(Object o) {
701         final ReentrantLock lock = this.lock;
702         lock.lock();
703         try {
704             return indexOf(o) != -1;
705         } finally {
706             lock.unlock();
707         }
708     }
709 
toString()710     public String toString() {
711         return Helpers.collectionToString(this);
712     }
713 
714     /**
715      * @throws UnsupportedOperationException {@inheritDoc}
716      * @throws ClassCastException            {@inheritDoc}
717      * @throws NullPointerException          {@inheritDoc}
718      * @throws IllegalArgumentException      {@inheritDoc}
719      */
drainTo(Collection<? super E> c)720     public int drainTo(Collection<? super E> c) {
721         return drainTo(c, Integer.MAX_VALUE);
722     }
723 
724     /**
725      * @throws UnsupportedOperationException {@inheritDoc}
726      * @throws ClassCastException            {@inheritDoc}
727      * @throws NullPointerException          {@inheritDoc}
728      * @throws IllegalArgumentException      {@inheritDoc}
729      */
drainTo(Collection<? super E> c, int maxElements)730     public int drainTo(Collection<? super E> c, int maxElements) {
731         if (c == null)
732             throw new NullPointerException();
733         if (c == this)
734             throw new IllegalArgumentException();
735         if (maxElements <= 0)
736             return 0;
737         final ReentrantLock lock = this.lock;
738         lock.lock();
739         try {
740             int n = Math.min(size, maxElements);
741             for (int i = 0; i < n; i++) {
742                 c.add((E) queue[0]); // In this order, in case add() throws.
743                 dequeue();
744             }
745             return n;
746         } finally {
747             lock.unlock();
748         }
749     }
750 
751     /**
752      * Atomically removes all of the elements from this queue.
753      * The queue will be empty after this call returns.
754      */
clear()755     public void clear() {
756         final ReentrantLock lock = this.lock;
757         lock.lock();
758         try {
759             Object[] array = queue;
760             int n = size;
761             size = 0;
762             for (int i = 0; i < n; i++)
763                 array[i] = null;
764         } finally {
765             lock.unlock();
766         }
767     }
768 
769     /**
770      * Returns an array containing all of the elements in this queue.
771      * The returned array elements are in no particular order.
772      *
773      * <p>The returned array will be "safe" in that no references to it are
774      * maintained by this queue.  (In other words, this method must allocate
775      * a new array).  The caller is thus free to modify the returned array.
776      *
777      * <p>This method acts as bridge between array-based and collection-based
778      * APIs.
779      *
780      * @return an array containing all of the elements in this queue
781      */
toArray()782     public Object[] toArray() {
783         final ReentrantLock lock = this.lock;
784         lock.lock();
785         try {
786             return Arrays.copyOf(queue, size);
787         } finally {
788             lock.unlock();
789         }
790     }
791 
792     /**
793      * Returns an array containing all of the elements in this queue; the
794      * runtime type of the returned array is that of the specified array.
795      * The returned array elements are in no particular order.
796      * If the queue fits in the specified array, it is returned therein.
797      * Otherwise, a new array is allocated with the runtime type of the
798      * specified array and the size of this queue.
799      *
800      * <p>If this queue fits in the specified array with room to spare
801      * (i.e., the array has more elements than this queue), the element in
802      * the array immediately following the end of the queue is set to
803      * {@code null}.
804      *
805      * <p>Like the {@link #toArray()} method, this method acts as bridge between
806      * array-based and collection-based APIs.  Further, this method allows
807      * precise control over the runtime type of the output array, and may,
808      * under certain circumstances, be used to save allocation costs.
809      *
810      * <p>Suppose {@code x} is a queue known to contain only strings.
811      * The following code can be used to dump the queue into a newly
812      * allocated array of {@code String}:
813      *
814      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
815      *
816      * Note that {@code toArray(new Object[0])} is identical in function to
817      * {@code toArray()}.
818      *
819      * @param a the array into which the elements of the queue are to
820      *          be stored, if it is big enough; otherwise, a new array of the
821      *          same runtime type is allocated for this purpose
822      * @return an array containing all of the elements in this queue
823      * @throws ArrayStoreException if the runtime type of the specified array
824      *         is not a supertype of the runtime type of every element in
825      *         this queue
826      * @throws NullPointerException if the specified array is null
827      */
toArray(T[] a)828     public <T> T[] toArray(T[] a) {
829         final ReentrantLock lock = this.lock;
830         lock.lock();
831         try {
832             int n = size;
833             if (a.length < n)
834                 // Make a new array of a's runtime type, but my contents:
835                 return (T[]) Arrays.copyOf(queue, size, a.getClass());
836             System.arraycopy(queue, 0, a, 0, n);
837             if (a.length > n)
838                 a[n] = null;
839             return a;
840         } finally {
841             lock.unlock();
842         }
843     }
844 
845     /**
846      * Returns an iterator over the elements in this queue. The
847      * iterator does not return the elements in any particular order.
848      *
849      * <p>The returned iterator is
850      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
851      *
852      * @return an iterator over the elements in this queue
853      */
iterator()854     public Iterator<E> iterator() {
855         return new Itr(toArray());
856     }
857 
858     /**
859      * Snapshot iterator that works off copy of underlying q array.
860      */
861     final class Itr implements Iterator<E> {
862         final Object[] array; // Array of all elements
863         int cursor;           // index of next element to return
864         int lastRet;          // index of last element, or -1 if no such
865 
Itr(Object[] array)866         Itr(Object[] array) {
867             lastRet = -1;
868             this.array = array;
869         }
870 
hasNext()871         public boolean hasNext() {
872             return cursor < array.length;
873         }
874 
next()875         public E next() {
876             if (cursor >= array.length)
877                 throw new NoSuchElementException();
878             lastRet = cursor;
879             return (E)array[cursor++];
880         }
881 
remove()882         public void remove() {
883             if (lastRet < 0)
884                 throw new IllegalStateException();
885             removeEQ(array[lastRet]);
886             lastRet = -1;
887         }
888     }
889 
890     /**
891      * Saves this queue to a stream (that is, serializes it).
892      *
893      * For compatibility with previous version of this class, elements
894      * are first copied to a java.util.PriorityQueue, which is then
895      * serialized.
896      *
897      * @param s the stream
898      * @throws java.io.IOException if an I/O error occurs
899      */
writeObject(java.io.ObjectOutputStream s)900     private void writeObject(java.io.ObjectOutputStream s)
901         throws java.io.IOException {
902         lock.lock();
903         try {
904             // avoid zero capacity argument
905             q = new PriorityQueue<E>(Math.max(size, 1), comparator);
906             q.addAll(this);
907             s.defaultWriteObject();
908         } finally {
909             q = null;
910             lock.unlock();
911         }
912     }
913 
914     /**
915      * Reconstitutes this queue from a stream (that is, deserializes it).
916      * @param s the stream
917      * @throws ClassNotFoundException if the class of a serialized object
918      *         could not be found
919      * @throws java.io.IOException if an I/O error occurs
920      */
readObject(java.io.ObjectInputStream s)921     private void readObject(java.io.ObjectInputStream s)
922         throws java.io.IOException, ClassNotFoundException {
923         try {
924             s.defaultReadObject();
925             this.queue = new Object[q.size()];
926             comparator = q.comparator();
927             addAll(q);
928         } finally {
929             q = null;
930         }
931     }
932 
933     // Similar to Collections.ArraySnapshotSpliterator but avoids
934     // commitment to toArray until needed
935     static final class PBQSpliterator<E> implements Spliterator<E> {
936         final PriorityBlockingQueue<E> queue;
937         Object[] array;
938         int index;
939         int fence;
940 
PBQSpliterator(PriorityBlockingQueue<E> queue, Object[] array, int index, int fence)941         PBQSpliterator(PriorityBlockingQueue<E> queue, Object[] array,
942                        int index, int fence) {
943             this.queue = queue;
944             this.array = array;
945             this.index = index;
946             this.fence = fence;
947         }
948 
getFence()949         final int getFence() {
950             int hi;
951             if ((hi = fence) < 0)
952                 hi = fence = (array = queue.toArray()).length;
953             return hi;
954         }
955 
trySplit()956         public PBQSpliterator<E> trySplit() {
957             int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
958             return (lo >= mid) ? null :
959                 new PBQSpliterator<E>(queue, array, lo, index = mid);
960         }
961 
962         @SuppressWarnings("unchecked")
forEachRemaining(Consumer<? super E> action)963         public void forEachRemaining(Consumer<? super E> action) {
964             Object[] a; int i, hi; // hoist accesses and checks from loop
965             if (action == null)
966                 throw new NullPointerException();
967             if ((a = array) == null)
968                 fence = (a = queue.toArray()).length;
969             if ((hi = fence) <= a.length &&
970                 (i = index) >= 0 && i < (index = hi)) {
971                 do { action.accept((E)a[i]); } while (++i < hi);
972             }
973         }
974 
tryAdvance(Consumer<? super E> action)975         public boolean tryAdvance(Consumer<? super E> action) {
976             if (action == null)
977                 throw new NullPointerException();
978             if (getFence() > index && index >= 0) {
979                 @SuppressWarnings("unchecked") E e = (E) array[index++];
980                 action.accept(e);
981                 return true;
982             }
983             return false;
984         }
985 
estimateSize()986         public long estimateSize() { return (long)(getFence() - index); }
987 
characteristics()988         public int characteristics() {
989             return Spliterator.NONNULL | Spliterator.SIZED | Spliterator.SUBSIZED;
990         }
991     }
992 
993     /**
994      * Returns a {@link Spliterator} over the elements in this queue.
995      *
996      * <p>The returned spliterator is
997      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
998      *
999      * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
1000      * {@link Spliterator#NONNULL}.
1001      *
1002      * @implNote
1003      * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
1004      *
1005      * @return a {@code Spliterator} over the elements in this queue
1006      * @since 1.8
1007      */
spliterator()1008     public Spliterator<E> spliterator() {
1009         return new PBQSpliterator<E>(this, null, 0, -1);
1010     }
1011 
1012     // Unsafe mechanics
1013     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1014     private static final long ALLOCATIONSPINLOCK;
1015     static {
1016         try {
1017             ALLOCATIONSPINLOCK = U.objectFieldOffset
1018                 (PriorityBlockingQueue.class.getDeclaredField("allocationSpinLock"));
1019         } catch (ReflectiveOperationException e) {
1020             throw new Error(e);
1021         }
1022     }
1023 }
1024