• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent;
8 
9 import java.util.AbstractQueue;
10 import java.util.Arrays;
11 import java.util.Collection;
12 import java.util.Comparator;
13 import java.util.Iterator;
14 import java.util.NoSuchElementException;
15 import java.util.PriorityQueue;
16 import java.util.Queue;
17 import java.util.SortedSet;
18 import java.util.Spliterator;
19 import java.util.concurrent.locks.Condition;
20 import java.util.concurrent.locks.ReentrantLock;
21 import java.util.function.Consumer;
22 
23 // BEGIN android-note
24 // removed link to collections framework docs
25 // END android-note
26 
27 /**
28  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
29  * the same ordering rules as class {@link PriorityQueue} and supplies
30  * blocking retrieval operations.  While this queue is logically
31  * unbounded, attempted additions may fail due to resource exhaustion
32  * (causing {@code OutOfMemoryError}). This class does not permit
33  * {@code null} elements.  A priority queue relying on {@linkplain
34  * Comparable natural ordering} also does not permit insertion of
35  * non-comparable objects (doing so results in
36  * {@code ClassCastException}).
37  *
38  * <p>This class and its iterator implement all of the
39  * <em>optional</em> methods of the {@link Collection} and {@link
40  * Iterator} interfaces.  The Iterator provided in method {@link
41  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
42  * the PriorityBlockingQueue in any particular order. If you need
43  * ordered traversal, consider using
44  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}
45  * can be used to <em>remove</em> some or all elements in priority
46  * order and place them in another collection.
47  *
48  * <p>Operations on this class make no guarantees about the ordering
49  * of elements with equal priority. If you need to enforce an
50  * ordering, you can define custom classes or comparators that use a
51  * secondary key to break ties in primary priority values.  For
52  * example, here is a class that applies first-in-first-out
53  * tie-breaking to comparable elements. To use it, you would insert a
54  * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
55  *
56  * <pre> {@code
57  * class FIFOEntry<E extends Comparable<? super E>>
58  *     implements Comparable<FIFOEntry<E>> {
59  *   static final AtomicLong seq = new AtomicLong(0);
60  *   final long seqNum;
61  *   final E entry;
62  *   public FIFOEntry(E entry) {
63  *     seqNum = seq.getAndIncrement();
64  *     this.entry = entry;
65  *   }
66  *   public E getEntry() { return entry; }
67  *   public int compareTo(FIFOEntry<E> other) {
68  *     int res = entry.compareTo(other.entry);
69  *     if (res == 0 && other.entry != this.entry)
70  *       res = (seqNum < other.seqNum ? -1 : 1);
71  *     return res;
72  *   }
73  * }}</pre>
74  *
75  * @since 1.5
76  * @author Doug Lea
77  * @param <E> the type of elements held in this queue
78  */
79 @SuppressWarnings("unchecked")
80 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
81     implements BlockingQueue<E>, java.io.Serializable {
82     private static final long serialVersionUID = 5595510919245408276L;
83 
84     /*
85      * The implementation uses an array-based binary heap, with public
86      * operations protected with a single lock. However, allocation
87      * during resizing uses a simple spinlock (used only while not
88      * holding main lock) in order to allow takes to operate
89      * concurrently with allocation.  This avoids repeated
90      * postponement of waiting consumers and consequent element
91      * build-up. The need to back away from lock during allocation
92      * makes it impossible to simply wrap delegated
93      * java.util.PriorityQueue operations within a lock, as was done
94      * in a previous version of this class. To maintain
95      * interoperability, a plain PriorityQueue is still used during
96      * serialization, which maintains compatibility at the expense of
97      * transiently doubling overhead.
98      */
99 
100     /**
101      * Default array capacity.
102      */
103     private static final int DEFAULT_INITIAL_CAPACITY = 11;
104 
105     /**
106      * The maximum size of array to allocate.
107      * Some VMs reserve some header words in an array.
108      * Attempts to allocate larger arrays may result in
109      * OutOfMemoryError: Requested array size exceeds VM limit
110      */
111     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
112 
113     /**
114      * Priority queue represented as a balanced binary heap: the two
115      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
116      * priority queue is ordered by comparator, or by the elements'
117      * natural ordering, if comparator is null: For each node n in the
118      * heap and each descendant d of n, n <= d.  The element with the
119      * lowest value is in queue[0], assuming the queue is nonempty.
120      */
121     private transient Object[] queue;
122 
123     /**
124      * The number of elements in the priority queue.
125      */
126     private transient int size;
127 
128     /**
129      * The comparator, or null if priority queue uses elements'
130      * natural ordering.
131      */
132     private transient Comparator<? super E> comparator;
133 
134     /**
135      * Lock used for all public operations.
136      */
137     private final ReentrantLock lock;
138 
139     /**
140      * Condition for blocking when empty.
141      */
142     private final Condition notEmpty;
143 
144     /**
145      * Spinlock for allocation, acquired via CAS.
146      */
147     private transient volatile int allocationSpinLock;
148 
149     /**
150      * A plain PriorityQueue used only for serialization,
151      * to maintain compatibility with previous versions
152      * of this class. Non-null only during serialization/deserialization.
153      */
154     private PriorityQueue<E> q;
155 
156     /**
157      * Creates a {@code PriorityBlockingQueue} with the default
158      * initial capacity (11) that orders its elements according to
159      * their {@linkplain Comparable natural ordering}.
160      */
PriorityBlockingQueue()161     public PriorityBlockingQueue() {
162         this(DEFAULT_INITIAL_CAPACITY, null);
163     }
164 
165     /**
166      * Creates a {@code PriorityBlockingQueue} with the specified
167      * initial capacity that orders its elements according to their
168      * {@linkplain Comparable natural ordering}.
169      *
170      * @param initialCapacity the initial capacity for this priority queue
171      * @throws IllegalArgumentException if {@code initialCapacity} is less
172      *         than 1
173      */
PriorityBlockingQueue(int initialCapacity)174     public PriorityBlockingQueue(int initialCapacity) {
175         this(initialCapacity, null);
176     }
177 
178     /**
179      * Creates a {@code PriorityBlockingQueue} with the specified initial
180      * capacity that orders its elements according to the specified
181      * comparator.
182      *
183      * @param initialCapacity the initial capacity for this priority queue
184      * @param  comparator the comparator that will be used to order this
185      *         priority queue.  If {@code null}, the {@linkplain Comparable
186      *         natural ordering} of the elements will be used.
187      * @throws IllegalArgumentException if {@code initialCapacity} is less
188      *         than 1
189      */
PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)190     public PriorityBlockingQueue(int initialCapacity,
191                                  Comparator<? super E> comparator) {
192         if (initialCapacity < 1)
193             throw new IllegalArgumentException();
194         this.lock = new ReentrantLock();
195         this.notEmpty = lock.newCondition();
196         this.comparator = comparator;
197         this.queue = new Object[initialCapacity];
198     }
199 
200     /**
201      * Creates a {@code PriorityBlockingQueue} containing the elements
202      * in the specified collection.  If the specified collection is a
203      * {@link SortedSet} or a {@link PriorityQueue}, this
204      * priority queue will be ordered according to the same ordering.
205      * Otherwise, this priority queue will be ordered according to the
206      * {@linkplain Comparable natural ordering} of its elements.
207      *
208      * @param  c the collection whose elements are to be placed
209      *         into this priority queue
210      * @throws ClassCastException if elements of the specified collection
211      *         cannot be compared to one another according to the priority
212      *         queue's ordering
213      * @throws NullPointerException if the specified collection or any
214      *         of its elements are null
215      */
PriorityBlockingQueue(Collection<? extends E> c)216     public PriorityBlockingQueue(Collection<? extends E> c) {
217         this.lock = new ReentrantLock();
218         this.notEmpty = lock.newCondition();
219         boolean heapify = true; // true if not known to be in heap order
220         boolean screen = true;  // true if must screen for nulls
221         if (c instanceof SortedSet<?>) {
222             SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
223             this.comparator = (Comparator<? super E>) ss.comparator();
224             heapify = false;
225         }
226         else if (c instanceof PriorityBlockingQueue<?>) {
227             PriorityBlockingQueue<? extends E> pq =
228                 (PriorityBlockingQueue<? extends E>) c;
229             this.comparator = (Comparator<? super E>) pq.comparator();
230             screen = false;
231             if (pq.getClass() == PriorityBlockingQueue.class) // exact match
232                 heapify = false;
233         }
234         Object[] a = c.toArray();
235         int n = a.length;
236         // If c.toArray incorrectly doesn't return Object[], copy it.
237         if (a.getClass() != Object[].class)
238             a = Arrays.copyOf(a, n, Object[].class);
239         if (screen && (n == 1 || this.comparator != null)) {
240             for (int i = 0; i < n; ++i)
241                 if (a[i] == null)
242                     throw new NullPointerException();
243         }
244         this.queue = a;
245         this.size = n;
246         if (heapify)
247             heapify();
248     }
249 
250     /**
251      * Tries to grow array to accommodate at least one more element
252      * (but normally expand by about 50%), giving up (allowing retry)
253      * on contention (which we expect to be rare). Call only while
254      * holding lock.
255      *
256      * @param array the heap array
257      * @param oldCap the length of the array
258      */
tryGrow(Object[] array, int oldCap)259     private void tryGrow(Object[] array, int oldCap) {
260         lock.unlock(); // must release and then re-acquire main lock
261         Object[] newArray = null;
262         if (allocationSpinLock == 0 &&
263             U.compareAndSwapInt(this, ALLOCATIONSPINLOCK, 0, 1)) {
264             try {
265                 int newCap = oldCap + ((oldCap < 64) ?
266                                        (oldCap + 2) : // grow faster if small
267                                        (oldCap >> 1));
268                 if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
269                     int minCap = oldCap + 1;
270                     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
271                         throw new OutOfMemoryError();
272                     newCap = MAX_ARRAY_SIZE;
273                 }
274                 if (newCap > oldCap && queue == array)
275                     newArray = new Object[newCap];
276             } finally {
277                 allocationSpinLock = 0;
278             }
279         }
280         if (newArray == null) // back off if another thread is allocating
281             Thread.yield();
282         lock.lock();
283         if (newArray != null && queue == array) {
284             queue = newArray;
285             System.arraycopy(array, 0, newArray, 0, oldCap);
286         }
287     }
288 
289     /**
290      * Mechanics for poll().  Call only while holding lock.
291      */
dequeue()292     private E dequeue() {
293         int n = size - 1;
294         if (n < 0)
295             return null;
296         else {
297             Object[] array = queue;
298             E result = (E) array[0];
299             E x = (E) array[n];
300             array[n] = null;
301             Comparator<? super E> cmp = comparator;
302             if (cmp == null)
303                 siftDownComparable(0, x, array, n);
304             else
305                 siftDownUsingComparator(0, x, array, n, cmp);
306             size = n;
307             return result;
308         }
309     }
310 
311     /**
312      * Inserts item x at position k, maintaining heap invariant by
313      * promoting x up the tree until it is greater than or equal to
314      * its parent, or is the root.
315      *
316      * To simplify and speed up coercions and comparisons. the
317      * Comparable and Comparator versions are separated into different
318      * methods that are otherwise identical. (Similarly for siftDown.)
319      * These methods are static, with heap state as arguments, to
320      * simplify use in light of possible comparator exceptions.
321      *
322      * @param k the position to fill
323      * @param x the item to insert
324      * @param array the heap array
325      */
siftUpComparable(int k, T x, Object[] array)326     private static <T> void siftUpComparable(int k, T x, Object[] array) {
327         Comparable<? super T> key = (Comparable<? super T>) x;
328         while (k > 0) {
329             int parent = (k - 1) >>> 1;
330             Object e = array[parent];
331             if (key.compareTo((T) e) >= 0)
332                 break;
333             array[k] = e;
334             k = parent;
335         }
336         array[k] = key;
337     }
338 
siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp)339     private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
340                                        Comparator<? super T> cmp) {
341         while (k > 0) {
342             int parent = (k - 1) >>> 1;
343             Object e = array[parent];
344             if (cmp.compare(x, (T) e) >= 0)
345                 break;
346             array[k] = e;
347             k = parent;
348         }
349         array[k] = x;
350     }
351 
352     /**
353      * Inserts item x at position k, maintaining heap invariant by
354      * demoting x down the tree repeatedly until it is less than or
355      * equal to its children or is a leaf.
356      *
357      * @param k the position to fill
358      * @param x the item to insert
359      * @param array the heap array
360      * @param n heap size
361      */
siftDownComparable(int k, T x, Object[] array, int n)362     private static <T> void siftDownComparable(int k, T x, Object[] array,
363                                                int n) {
364         if (n > 0) {
365             Comparable<? super T> key = (Comparable<? super T>)x;
366             int half = n >>> 1;           // loop while a non-leaf
367             while (k < half) {
368                 int child = (k << 1) + 1; // assume left child is least
369                 Object c = array[child];
370                 int right = child + 1;
371                 if (right < n &&
372                     ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
373                     c = array[child = right];
374                 if (key.compareTo((T) c) <= 0)
375                     break;
376                 array[k] = c;
377                 k = child;
378             }
379             array[k] = key;
380         }
381     }
382 
siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp)383     private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
384                                                     int n,
385                                                     Comparator<? super T> cmp) {
386         if (n > 0) {
387             int half = n >>> 1;
388             while (k < half) {
389                 int child = (k << 1) + 1;
390                 Object c = array[child];
391                 int right = child + 1;
392                 if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
393                     c = array[child = right];
394                 if (cmp.compare(x, (T) c) <= 0)
395                     break;
396                 array[k] = c;
397                 k = child;
398             }
399             array[k] = x;
400         }
401     }
402 
403     /**
404      * Establishes the heap invariant (described above) in the entire tree,
405      * assuming nothing about the order of the elements prior to the call.
406      */
heapify()407     private void heapify() {
408         Object[] array = queue;
409         int n = size;
410         int half = (n >>> 1) - 1;
411         Comparator<? super E> cmp = comparator;
412         if (cmp == null) {
413             for (int i = half; i >= 0; i--)
414                 siftDownComparable(i, (E) array[i], array, n);
415         }
416         else {
417             for (int i = half; i >= 0; i--)
418                 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
419         }
420     }
421 
422     /**
423      * Inserts the specified element into this priority queue.
424      *
425      * @param e the element to add
426      * @return {@code true} (as specified by {@link Collection#add})
427      * @throws ClassCastException if the specified element cannot be compared
428      *         with elements currently in the priority queue according to the
429      *         priority queue's ordering
430      * @throws NullPointerException if the specified element is null
431      */
add(E e)432     public boolean add(E e) {
433         return offer(e);
434     }
435 
436     /**
437      * Inserts the specified element into this priority queue.
438      * As the queue is unbounded, this method will never return {@code false}.
439      *
440      * @param e the element to add
441      * @return {@code true} (as specified by {@link Queue#offer})
442      * @throws ClassCastException if the specified element cannot be compared
443      *         with elements currently in the priority queue according to the
444      *         priority queue's ordering
445      * @throws NullPointerException if the specified element is null
446      */
offer(E e)447     public boolean offer(E e) {
448         if (e == null)
449             throw new NullPointerException();
450         final ReentrantLock lock = this.lock;
451         lock.lock();
452         int n, cap;
453         Object[] array;
454         while ((n = size) >= (cap = (array = queue).length))
455             tryGrow(array, cap);
456         try {
457             Comparator<? super E> cmp = comparator;
458             if (cmp == null)
459                 siftUpComparable(n, e, array);
460             else
461                 siftUpUsingComparator(n, e, array, cmp);
462             size = n + 1;
463             notEmpty.signal();
464         } finally {
465             lock.unlock();
466         }
467         return true;
468     }
469 
470     /**
471      * Inserts the specified element into this priority queue.
472      * As the queue is unbounded, this method will never block.
473      *
474      * @param e the element to add
475      * @throws ClassCastException if the specified element cannot be compared
476      *         with elements currently in the priority queue according to the
477      *         priority queue's ordering
478      * @throws NullPointerException if the specified element is null
479      */
put(E e)480     public void put(E e) {
481         offer(e); // never need to block
482     }
483 
484     /**
485      * Inserts the specified element into this priority queue.
486      * As the queue is unbounded, this method will never block or
487      * return {@code false}.
488      *
489      * @param e the element to add
490      * @param timeout This parameter is ignored as the method never blocks
491      * @param unit This parameter is ignored as the method never blocks
492      * @return {@code true} (as specified by
493      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
494      * @throws ClassCastException if the specified element cannot be compared
495      *         with elements currently in the priority queue according to the
496      *         priority queue's ordering
497      * @throws NullPointerException if the specified element is null
498      */
offer(E e, long timeout, TimeUnit unit)499     public boolean offer(E e, long timeout, TimeUnit unit) {
500         return offer(e); // never need to block
501     }
502 
poll()503     public E poll() {
504         final ReentrantLock lock = this.lock;
505         lock.lock();
506         try {
507             return dequeue();
508         } finally {
509             lock.unlock();
510         }
511     }
512 
take()513     public E take() throws InterruptedException {
514         final ReentrantLock lock = this.lock;
515         lock.lockInterruptibly();
516         E result;
517         try {
518             while ( (result = dequeue()) == null)
519                 notEmpty.await();
520         } finally {
521             lock.unlock();
522         }
523         return result;
524     }
525 
poll(long timeout, TimeUnit unit)526     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
527         long nanos = unit.toNanos(timeout);
528         final ReentrantLock lock = this.lock;
529         lock.lockInterruptibly();
530         E result;
531         try {
532             while ( (result = dequeue()) == null && nanos > 0)
533                 nanos = notEmpty.awaitNanos(nanos);
534         } finally {
535             lock.unlock();
536         }
537         return result;
538     }
539 
peek()540     public E peek() {
541         final ReentrantLock lock = this.lock;
542         lock.lock();
543         try {
544             return (size == 0) ? null : (E) queue[0];
545         } finally {
546             lock.unlock();
547         }
548     }
549 
550     /**
551      * Returns the comparator used to order the elements in this queue,
552      * or {@code null} if this queue uses the {@linkplain Comparable
553      * natural ordering} of its elements.
554      *
555      * @return the comparator used to order the elements in this queue,
556      *         or {@code null} if this queue uses the natural
557      *         ordering of its elements
558      */
comparator()559     public Comparator<? super E> comparator() {
560         return comparator;
561     }
562 
size()563     public int size() {
564         final ReentrantLock lock = this.lock;
565         lock.lock();
566         try {
567             return size;
568         } finally {
569             lock.unlock();
570         }
571     }
572 
573     /**
574      * Always returns {@code Integer.MAX_VALUE} because
575      * a {@code PriorityBlockingQueue} is not capacity constrained.
576      * @return {@code Integer.MAX_VALUE} always
577      */
remainingCapacity()578     public int remainingCapacity() {
579         return Integer.MAX_VALUE;
580     }
581 
indexOf(Object o)582     private int indexOf(Object o) {
583         if (o != null) {
584             Object[] array = queue;
585             int n = size;
586             for (int i = 0; i < n; i++)
587                 if (o.equals(array[i]))
588                     return i;
589         }
590         return -1;
591     }
592 
593     /**
594      * Removes the ith element from queue.
595      */
removeAt(int i)596     private void removeAt(int i) {
597         Object[] array = queue;
598         int n = size - 1;
599         if (n == i) // removed last element
600             array[i] = null;
601         else {
602             E moved = (E) array[n];
603             array[n] = null;
604             Comparator<? super E> cmp = comparator;
605             if (cmp == null)
606                 siftDownComparable(i, moved, array, n);
607             else
608                 siftDownUsingComparator(i, moved, array, n, cmp);
609             if (array[i] == moved) {
610                 if (cmp == null)
611                     siftUpComparable(i, moved, array);
612                 else
613                     siftUpUsingComparator(i, moved, array, cmp);
614             }
615         }
616         size = n;
617     }
618 
619     /**
620      * Removes a single instance of the specified element from this queue,
621      * if it is present.  More formally, removes an element {@code e} such
622      * that {@code o.equals(e)}, if this queue contains one or more such
623      * elements.  Returns {@code true} if and only if this queue contained
624      * the specified element (or equivalently, if this queue changed as a
625      * result of the call).
626      *
627      * @param o element to be removed from this queue, if present
628      * @return {@code true} if this queue changed as a result of the call
629      */
remove(Object o)630     public boolean remove(Object o) {
631         final ReentrantLock lock = this.lock;
632         lock.lock();
633         try {
634             int i = indexOf(o);
635             if (i == -1)
636                 return false;
637             removeAt(i);
638             return true;
639         } finally {
640             lock.unlock();
641         }
642     }
643 
644     /**
645      * Identity-based version for use in Itr.remove.
646      */
removeEQ(Object o)647     void removeEQ(Object o) {
648         final ReentrantLock lock = this.lock;
649         lock.lock();
650         try {
651             Object[] array = queue;
652             for (int i = 0, n = size; i < n; i++) {
653                 if (o == array[i]) {
654                     removeAt(i);
655                     break;
656                 }
657             }
658         } finally {
659             lock.unlock();
660         }
661     }
662 
663     /**
664      * Returns {@code true} if this queue contains the specified element.
665      * More formally, returns {@code true} if and only if this queue contains
666      * at least one element {@code e} such that {@code o.equals(e)}.
667      *
668      * @param o object to be checked for containment in this queue
669      * @return {@code true} if this queue contains the specified element
670      */
contains(Object o)671     public boolean contains(Object o) {
672         final ReentrantLock lock = this.lock;
673         lock.lock();
674         try {
675             return indexOf(o) != -1;
676         } finally {
677             lock.unlock();
678         }
679     }
680 
toString()681     public String toString() {
682         return Helpers.collectionToString(this);
683     }
684 
685     /**
686      * @throws UnsupportedOperationException {@inheritDoc}
687      * @throws ClassCastException            {@inheritDoc}
688      * @throws NullPointerException          {@inheritDoc}
689      * @throws IllegalArgumentException      {@inheritDoc}
690      */
drainTo(Collection<? super E> c)691     public int drainTo(Collection<? super E> c) {
692         return drainTo(c, Integer.MAX_VALUE);
693     }
694 
695     /**
696      * @throws UnsupportedOperationException {@inheritDoc}
697      * @throws ClassCastException            {@inheritDoc}
698      * @throws NullPointerException          {@inheritDoc}
699      * @throws IllegalArgumentException      {@inheritDoc}
700      */
drainTo(Collection<? super E> c, int maxElements)701     public int drainTo(Collection<? super E> c, int maxElements) {
702         if (c == null)
703             throw new NullPointerException();
704         if (c == this)
705             throw new IllegalArgumentException();
706         if (maxElements <= 0)
707             return 0;
708         final ReentrantLock lock = this.lock;
709         lock.lock();
710         try {
711             int n = Math.min(size, maxElements);
712             for (int i = 0; i < n; i++) {
713                 c.add((E) queue[0]); // In this order, in case add() throws.
714                 dequeue();
715             }
716             return n;
717         } finally {
718             lock.unlock();
719         }
720     }
721 
722     /**
723      * Atomically removes all of the elements from this queue.
724      * The queue will be empty after this call returns.
725      */
clear()726     public void clear() {
727         final ReentrantLock lock = this.lock;
728         lock.lock();
729         try {
730             Object[] array = queue;
731             int n = size;
732             size = 0;
733             for (int i = 0; i < n; i++)
734                 array[i] = null;
735         } finally {
736             lock.unlock();
737         }
738     }
739 
740     /**
741      * Returns an array containing all of the elements in this queue.
742      * The returned array elements are in no particular order.
743      *
744      * <p>The returned array will be "safe" in that no references to it are
745      * maintained by this queue.  (In other words, this method must allocate
746      * a new array).  The caller is thus free to modify the returned array.
747      *
748      * <p>This method acts as bridge between array-based and collection-based
749      * APIs.
750      *
751      * @return an array containing all of the elements in this queue
752      */
toArray()753     public Object[] toArray() {
754         final ReentrantLock lock = this.lock;
755         lock.lock();
756         try {
757             return Arrays.copyOf(queue, size);
758         } finally {
759             lock.unlock();
760         }
761     }
762 
763     /**
764      * Returns an array containing all of the elements in this queue; the
765      * runtime type of the returned array is that of the specified array.
766      * The returned array elements are in no particular order.
767      * If the queue fits in the specified array, it is returned therein.
768      * Otherwise, a new array is allocated with the runtime type of the
769      * specified array and the size of this queue.
770      *
771      * <p>If this queue fits in the specified array with room to spare
772      * (i.e., the array has more elements than this queue), the element in
773      * the array immediately following the end of the queue is set to
774      * {@code null}.
775      *
776      * <p>Like the {@link #toArray()} method, this method acts as bridge between
777      * array-based and collection-based APIs.  Further, this method allows
778      * precise control over the runtime type of the output array, and may,
779      * under certain circumstances, be used to save allocation costs.
780      *
781      * <p>Suppose {@code x} is a queue known to contain only strings.
782      * The following code can be used to dump the queue into a newly
783      * allocated array of {@code String}:
784      *
785      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
786      *
787      * Note that {@code toArray(new Object[0])} is identical in function to
788      * {@code toArray()}.
789      *
790      * @param a the array into which the elements of the queue are to
791      *          be stored, if it is big enough; otherwise, a new array of the
792      *          same runtime type is allocated for this purpose
793      * @return an array containing all of the elements in this queue
794      * @throws ArrayStoreException if the runtime type of the specified array
795      *         is not a supertype of the runtime type of every element in
796      *         this queue
797      * @throws NullPointerException if the specified array is null
798      */
toArray(T[] a)799     public <T> T[] toArray(T[] a) {
800         final ReentrantLock lock = this.lock;
801         lock.lock();
802         try {
803             int n = size;
804             if (a.length < n)
805                 // Make a new array of a's runtime type, but my contents:
806                 return (T[]) Arrays.copyOf(queue, size, a.getClass());
807             System.arraycopy(queue, 0, a, 0, n);
808             if (a.length > n)
809                 a[n] = null;
810             return a;
811         } finally {
812             lock.unlock();
813         }
814     }
815 
816     /**
817      * Returns an iterator over the elements in this queue. The
818      * iterator does not return the elements in any particular order.
819      *
820      * <p>The returned iterator is
821      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
822      *
823      * @return an iterator over the elements in this queue
824      */
iterator()825     public Iterator<E> iterator() {
826         return new Itr(toArray());
827     }
828 
829     /**
830      * Snapshot iterator that works off copy of underlying q array.
831      */
832     final class Itr implements Iterator<E> {
833         final Object[] array; // Array of all elements
834         int cursor;           // index of next element to return
835         int lastRet;          // index of last element, or -1 if no such
836 
Itr(Object[] array)837         Itr(Object[] array) {
838             lastRet = -1;
839             this.array = array;
840         }
841 
hasNext()842         public boolean hasNext() {
843             return cursor < array.length;
844         }
845 
next()846         public E next() {
847             if (cursor >= array.length)
848                 throw new NoSuchElementException();
849             lastRet = cursor;
850             return (E)array[cursor++];
851         }
852 
remove()853         public void remove() {
854             if (lastRet < 0)
855                 throw new IllegalStateException();
856             removeEQ(array[lastRet]);
857             lastRet = -1;
858         }
859     }
860 
861     /**
862      * Saves this queue to a stream (that is, serializes it).
863      *
864      * For compatibility with previous version of this class, elements
865      * are first copied to a java.util.PriorityQueue, which is then
866      * serialized.
867      *
868      * @param s the stream
869      * @throws java.io.IOException if an I/O error occurs
870      */
writeObject(java.io.ObjectOutputStream s)871     private void writeObject(java.io.ObjectOutputStream s)
872         throws java.io.IOException {
873         lock.lock();
874         try {
875             // avoid zero capacity argument
876             q = new PriorityQueue<E>(Math.max(size, 1), comparator);
877             q.addAll(this);
878             s.defaultWriteObject();
879         } finally {
880             q = null;
881             lock.unlock();
882         }
883     }
884 
885     /**
886      * Reconstitutes this queue from a stream (that is, deserializes it).
887      * @param s the stream
888      * @throws ClassNotFoundException if the class of a serialized object
889      *         could not be found
890      * @throws java.io.IOException if an I/O error occurs
891      */
readObject(java.io.ObjectInputStream s)892     private void readObject(java.io.ObjectInputStream s)
893         throws java.io.IOException, ClassNotFoundException {
894         try {
895             s.defaultReadObject();
896             this.queue = new Object[q.size()];
897             comparator = q.comparator();
898             addAll(q);
899         } finally {
900             q = null;
901         }
902     }
903 
904     // Similar to Collections.ArraySnapshotSpliterator but avoids
905     // commitment to toArray until needed
906     static final class PBQSpliterator<E> implements Spliterator<E> {
907         final PriorityBlockingQueue<E> queue;
908         Object[] array;
909         int index;
910         int fence;
911 
PBQSpliterator(PriorityBlockingQueue<E> queue, Object[] array, int index, int fence)912         PBQSpliterator(PriorityBlockingQueue<E> queue, Object[] array,
913                        int index, int fence) {
914             this.queue = queue;
915             this.array = array;
916             this.index = index;
917             this.fence = fence;
918         }
919 
getFence()920         final int getFence() {
921             int hi;
922             if ((hi = fence) < 0)
923                 hi = fence = (array = queue.toArray()).length;
924             return hi;
925         }
926 
trySplit()927         public PBQSpliterator<E> trySplit() {
928             int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
929             return (lo >= mid) ? null :
930                 new PBQSpliterator<E>(queue, array, lo, index = mid);
931         }
932 
933         @SuppressWarnings("unchecked")
forEachRemaining(Consumer<? super E> action)934         public void forEachRemaining(Consumer<? super E> action) {
935             Object[] a; int i, hi; // hoist accesses and checks from loop
936             if (action == null)
937                 throw new NullPointerException();
938             if ((a = array) == null)
939                 fence = (a = queue.toArray()).length;
940             if ((hi = fence) <= a.length &&
941                 (i = index) >= 0 && i < (index = hi)) {
942                 do { action.accept((E)a[i]); } while (++i < hi);
943             }
944         }
945 
tryAdvance(Consumer<? super E> action)946         public boolean tryAdvance(Consumer<? super E> action) {
947             if (action == null)
948                 throw new NullPointerException();
949             if (getFence() > index && index >= 0) {
950                 @SuppressWarnings("unchecked") E e = (E) array[index++];
951                 action.accept(e);
952                 return true;
953             }
954             return false;
955         }
956 
estimateSize()957         public long estimateSize() { return (long)(getFence() - index); }
958 
characteristics()959         public int characteristics() {
960             return Spliterator.NONNULL | Spliterator.SIZED | Spliterator.SUBSIZED;
961         }
962     }
963 
964     /**
965      * Returns a {@link Spliterator} over the elements in this queue.
966      *
967      * <p>The returned spliterator is
968      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
969      *
970      * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
971      * {@link Spliterator#NONNULL}.
972      *
973      * @implNote
974      * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
975      *
976      * @return a {@code Spliterator} over the elements in this queue
977      * @since 1.8
978      */
spliterator()979     public Spliterator<E> spliterator() {
980         return new PBQSpliterator<E>(this, null, 0, -1);
981     }
982 
983     // Unsafe mechanics
984     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
985     private static final long ALLOCATIONSPINLOCK;
986     static {
987         try {
988             ALLOCATIONSPINLOCK = U.objectFieldOffset
989                 (PriorityBlockingQueue.class.getDeclaredField("allocationSpinLock"));
990         } catch (ReflectiveOperationException e) {
991             throw new Error(e);
992         }
993     }
994 }
995