• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.lang.invoke.MethodHandles;
39 import java.lang.invoke.VarHandle;
40 import java.util.AbstractQueue;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.Comparator;
44 import java.util.Iterator;
45 import java.util.NoSuchElementException;
46 import java.util.Objects;
47 import java.util.PriorityQueue;
48 import java.util.Queue;
49 import java.util.SortedSet;
50 import java.util.Spliterator;
51 import java.util.concurrent.locks.Condition;
52 import java.util.concurrent.locks.ReentrantLock;
53 import java.util.function.Consumer;
54 import java.util.function.Predicate;
55 import jdk.internal.misc.SharedSecrets;
56 
57 /**
58  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
59  * the same ordering rules as class {@link PriorityQueue} and supplies
60  * blocking retrieval operations.  While this queue is logically
61  * unbounded, attempted additions may fail due to resource exhaustion
62  * (causing {@code OutOfMemoryError}). This class does not permit
63  * {@code null} elements.  A priority queue relying on {@linkplain
64  * Comparable natural ordering} also does not permit insertion of
65  * non-comparable objects (doing so results in
66  * {@code ClassCastException}).
67  *
68  * <p>This class and its iterator implement all of the <em>optional</em>
69  * methods of the {@link Collection} and {@link Iterator} interfaces.
70  * The Iterator provided in method {@link #iterator()} and the
71  * Spliterator provided in method {@link #spliterator()} are <em>not</em>
72  * guaranteed to traverse the elements of the PriorityBlockingQueue in
73  * any particular order. If you need ordered traversal, consider using
74  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo} can
75  * be used to <em>remove</em> some or all elements in priority order and
76  * place them in another collection.
77  *
78  * <p>Operations on this class make no guarantees about the ordering
79  * of elements with equal priority. If you need to enforce an
80  * ordering, you can define custom classes or comparators that use a
81  * secondary key to break ties in primary priority values.  For
82  * example, here is a class that applies first-in-first-out
83  * tie-breaking to comparable elements. To use it, you would insert a
84  * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
85  *
86  * <pre> {@code
87  * class FIFOEntry<E extends Comparable<? super E>>
88  *     implements Comparable<FIFOEntry<E>> {
89  *   static final AtomicLong seq = new AtomicLong(0);
90  *   final long seqNum;
91  *   final E entry;
92  *   public FIFOEntry(E entry) {
93  *     seqNum = seq.getAndIncrement();
94  *     this.entry = entry;
95  *   }
96  *   public E getEntry() { return entry; }
97  *   public int compareTo(FIFOEntry<E> other) {
98  *     int res = entry.compareTo(other.entry);
99  *     if (res == 0 && other.entry != this.entry)
100  *       res = (seqNum < other.seqNum ? -1 : 1);
101  *     return res;
102  *   }
103  * }}</pre>
104  *
105  * <p>This class is a member of the
106  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
107  * Java Collections Framework</a>.
108  *
109  * @since 1.5
110  * @author Doug Lea
111  * @param <E> the type of elements held in this queue
112  */
113 @SuppressWarnings("unchecked")
114 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
115     implements BlockingQueue<E>, java.io.Serializable {
116     private static final long serialVersionUID = 5595510919245408276L;
117 
118     /*
119      * The implementation uses an array-based binary heap, with public
120      * operations protected with a single lock. However, allocation
121      * during resizing uses a simple spinlock (used only while not
122      * holding main lock) in order to allow takes to operate
123      * concurrently with allocation.  This avoids repeated
124      * postponement of waiting consumers and consequent element
125      * build-up. The need to back away from lock during allocation
126      * makes it impossible to simply wrap delegated
127      * java.util.PriorityQueue operations within a lock, as was done
128      * in a previous version of this class. To maintain
129      * interoperability, a plain PriorityQueue is still used during
130      * serialization, which maintains compatibility at the expense of
131      * transiently doubling overhead.
132      */
133 
134     /**
135      * Default array capacity.
136      */
137     private static final int DEFAULT_INITIAL_CAPACITY = 11;
138 
139     /**
140      * The maximum size of array to allocate.
141      * Some VMs reserve some header words in an array.
142      * Attempts to allocate larger arrays may result in
143      * OutOfMemoryError: Requested array size exceeds VM limit
144      */
145     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
146 
147     /**
148      * Priority queue represented as a balanced binary heap: the two
149      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
150      * priority queue is ordered by comparator, or by the elements'
151      * natural ordering, if comparator is null: For each node n in the
152      * heap and each descendant d of n, n <= d.  The element with the
153      * lowest value is in queue[0], assuming the queue is nonempty.
154      */
155     private transient Object[] queue;
156 
157     /**
158      * The number of elements in the priority queue.
159      */
160     private transient int size;
161 
162     /**
163      * The comparator, or null if priority queue uses elements'
164      * natural ordering.
165      */
166     private transient Comparator<? super E> comparator;
167 
168     /**
169      * Lock used for all public operations.
170      */
171     private final ReentrantLock lock = new ReentrantLock();
172 
173     /**
174      * Condition for blocking when empty.
175      */
176     private final Condition notEmpty = lock.newCondition();
177 
178     /**
179      * Spinlock for allocation, acquired via CAS.
180      */
181     private transient volatile int allocationSpinLock;
182 
183     /**
184      * A plain PriorityQueue used only for serialization,
185      * to maintain compatibility with previous versions
186      * of this class. Non-null only during serialization/deserialization.
187      */
188     private PriorityQueue<E> q;
189 
190     /**
191      * Creates a {@code PriorityBlockingQueue} with the default
192      * initial capacity (11) that orders its elements according to
193      * their {@linkplain Comparable natural ordering}.
194      */
PriorityBlockingQueue()195     public PriorityBlockingQueue() {
196         this(DEFAULT_INITIAL_CAPACITY, null);
197     }
198 
199     /**
200      * Creates a {@code PriorityBlockingQueue} with the specified
201      * initial capacity that orders its elements according to their
202      * {@linkplain Comparable natural ordering}.
203      *
204      * @param initialCapacity the initial capacity for this priority queue
205      * @throws IllegalArgumentException if {@code initialCapacity} is less
206      *         than 1
207      */
PriorityBlockingQueue(int initialCapacity)208     public PriorityBlockingQueue(int initialCapacity) {
209         this(initialCapacity, null);
210     }
211 
212     /**
213      * Creates a {@code PriorityBlockingQueue} with the specified initial
214      * capacity that orders its elements according to the specified
215      * comparator.
216      *
217      * @param initialCapacity the initial capacity for this priority queue
218      * @param  comparator the comparator that will be used to order this
219      *         priority queue.  If {@code null}, the {@linkplain Comparable
220      *         natural ordering} of the elements will be used.
221      * @throws IllegalArgumentException if {@code initialCapacity} is less
222      *         than 1
223      */
PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)224     public PriorityBlockingQueue(int initialCapacity,
225                                  Comparator<? super E> comparator) {
226         if (initialCapacity < 1)
227             throw new IllegalArgumentException();
228         this.comparator = comparator;
229         this.queue = new Object[Math.max(1, initialCapacity)];
230     }
231 
232     /**
233      * Creates a {@code PriorityBlockingQueue} containing the elements
234      * in the specified collection.  If the specified collection is a
235      * {@link SortedSet} or a {@link PriorityQueue}, this
236      * priority queue will be ordered according to the same ordering.
237      * Otherwise, this priority queue will be ordered according to the
238      * {@linkplain Comparable natural ordering} of its elements.
239      *
240      * @param  c the collection whose elements are to be placed
241      *         into this priority queue
242      * @throws ClassCastException if elements of the specified collection
243      *         cannot be compared to one another according to the priority
244      *         queue's ordering
245      * @throws NullPointerException if the specified collection or any
246      *         of its elements are null
247      */
PriorityBlockingQueue(Collection<? extends E> c)248     public PriorityBlockingQueue(Collection<? extends E> c) {
249         boolean heapify = true; // true if not known to be in heap order
250         boolean screen = true;  // true if must screen for nulls
251         if (c instanceof SortedSet<?>) {
252             SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
253             this.comparator = (Comparator<? super E>) ss.comparator();
254             heapify = false;
255         }
256         else if (c instanceof PriorityBlockingQueue<?>) {
257             PriorityBlockingQueue<? extends E> pq =
258                 (PriorityBlockingQueue<? extends E>) c;
259             this.comparator = (Comparator<? super E>) pq.comparator();
260             screen = false;
261             if (pq.getClass() == PriorityBlockingQueue.class) // exact match
262                 heapify = false;
263         }
264         Object[] es = c.toArray();
265         int n = es.length;
266         // Android-changed: Defend against c.toArray (incorrectly) not returning Object[]
267         //                  (see b/204397945)
268         // if (c.getClass() != java.util.ArrayList.class)
269         if (es.getClass() != Object[].class)
270             es = Arrays.copyOf(es, n, Object[].class);
271         if (screen && (n == 1 || this.comparator != null)) {
272             for (Object e : es)
273                 if (e == null)
274                     throw new NullPointerException();
275         }
276         this.queue = ensureNonEmpty(es);
277         this.size = n;
278         if (heapify)
279             heapify();
280     }
281 
282     /** Ensures that queue[0] exists, helping peek() and poll(). */
ensureNonEmpty(Object[] es)283     private static Object[] ensureNonEmpty(Object[] es) {
284         return (es.length > 0) ? es : new Object[1];
285     }
286 
287     /**
288      * Tries to grow array to accommodate at least one more element
289      * (but normally expand by about 50%), giving up (allowing retry)
290      * on contention (which we expect to be rare). Call only while
291      * holding lock.
292      *
293      * @param array the heap array
294      * @param oldCap the length of the array
295      */
tryGrow(Object[] array, int oldCap)296     private void tryGrow(Object[] array, int oldCap) {
297         lock.unlock(); // must release and then re-acquire main lock
298         Object[] newArray = null;
299         if (allocationSpinLock == 0 &&
300             ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
301             try {
302                 int newCap = oldCap + ((oldCap < 64) ?
303                                        (oldCap + 2) : // grow faster if small
304                                        (oldCap >> 1));
305                 if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
306                     int minCap = oldCap + 1;
307                     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
308                         throw new OutOfMemoryError();
309                     newCap = MAX_ARRAY_SIZE;
310                 }
311                 if (newCap > oldCap && queue == array)
312                     newArray = new Object[newCap];
313             } finally {
314                 allocationSpinLock = 0;
315             }
316         }
317         if (newArray == null) // back off if another thread is allocating
318             Thread.yield();
319         lock.lock();
320         if (newArray != null && queue == array) {
321             queue = newArray;
322             System.arraycopy(array, 0, newArray, 0, oldCap);
323         }
324     }
325 
326     /**
327      * Mechanics for poll().  Call only while holding lock.
328      */
dequeue()329     private E dequeue() {
330         // assert lock.isHeldByCurrentThread();
331         final Object[] es;
332         final E result;
333 
334         if ((result = (E) ((es = queue)[0])) != null) {
335             final int n;
336             final E x = (E) es[(n = --size)];
337             es[n] = null;
338             if (n > 0) {
339                 final Comparator<? super E> cmp;
340                 if ((cmp = comparator) == null)
341                     siftDownComparable(0, x, es, n);
342                 else
343                     siftDownUsingComparator(0, x, es, n, cmp);
344             }
345         }
346         return result;
347     }
348 
349     /**
350      * Inserts item x at position k, maintaining heap invariant by
351      * promoting x up the tree until it is greater than or equal to
352      * its parent, or is the root.
353      *
354      * To simplify and speed up coercions and comparisons, the
355      * Comparable and Comparator versions are separated into different
356      * methods that are otherwise identical. (Similarly for siftDown.)
357      *
358      * @param k the position to fill
359      * @param x the item to insert
360      * @param es the heap array
361      */
siftUpComparable(int k, T x, Object[] es)362     private static <T> void siftUpComparable(int k, T x, Object[] es) {
363         Comparable<? super T> key = (Comparable<? super T>) x;
364         while (k > 0) {
365             int parent = (k - 1) >>> 1;
366             Object e = es[parent];
367             if (key.compareTo((T) e) >= 0)
368                 break;
369             es[k] = e;
370             k = parent;
371         }
372         es[k] = key;
373     }
374 
siftUpUsingComparator( int k, T x, Object[] es, Comparator<? super T> cmp)375     private static <T> void siftUpUsingComparator(
376         int k, T x, Object[] es, Comparator<? super T> cmp) {
377         while (k > 0) {
378             int parent = (k - 1) >>> 1;
379             Object e = es[parent];
380             if (cmp.compare(x, (T) e) >= 0)
381                 break;
382             es[k] = e;
383             k = parent;
384         }
385         es[k] = x;
386     }
387 
388     /**
389      * Inserts item x at position k, maintaining heap invariant by
390      * demoting x down the tree repeatedly until it is less than or
391      * equal to its children or is a leaf.
392      *
393      * @param k the position to fill
394      * @param x the item to insert
395      * @param es the heap array
396      * @param n heap size
397      */
siftDownComparable(int k, T x, Object[] es, int n)398     private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
399         // assert n > 0;
400         Comparable<? super T> key = (Comparable<? super T>)x;
401         int half = n >>> 1;           // loop while a non-leaf
402         while (k < half) {
403             int child = (k << 1) + 1; // assume left child is least
404             Object c = es[child];
405             int right = child + 1;
406             if (right < n &&
407                 ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
408                 c = es[child = right];
409             if (key.compareTo((T) c) <= 0)
410                 break;
411             es[k] = c;
412             k = child;
413         }
414         es[k] = key;
415     }
416 
siftDownUsingComparator( int k, T x, Object[] es, int n, Comparator<? super T> cmp)417     private static <T> void siftDownUsingComparator(
418         int k, T x, Object[] es, int n, Comparator<? super T> cmp) {
419         // assert n > 0;
420         int half = n >>> 1;
421         while (k < half) {
422             int child = (k << 1) + 1;
423             Object c = es[child];
424             int right = child + 1;
425             if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
426                 c = es[child = right];
427             if (cmp.compare(x, (T) c) <= 0)
428                 break;
429             es[k] = c;
430             k = child;
431         }
432         es[k] = x;
433     }
434 
435     /**
436      * Establishes the heap invariant (described above) in the entire tree,
437      * assuming nothing about the order of the elements prior to the call.
438      * This classic algorithm due to Floyd (1964) is known to be O(size).
439      */
heapify()440     private void heapify() {
441         final Object[] es = queue;
442         int n = size, i = (n >>> 1) - 1;
443         final Comparator<? super E> cmp;
444         if ((cmp = comparator) == null)
445             for (; i >= 0; i--)
446                 siftDownComparable(i, (E) es[i], es, n);
447         else
448             for (; i >= 0; i--)
449                 siftDownUsingComparator(i, (E) es[i], es, n, cmp);
450     }
451 
452     /**
453      * Inserts the specified element into this priority queue.
454      *
455      * @param e the element to add
456      * @return {@code true} (as specified by {@link Collection#add})
457      * @throws ClassCastException if the specified element cannot be compared
458      *         with elements currently in the priority queue according to the
459      *         priority queue's ordering
460      * @throws NullPointerException if the specified element is null
461      */
add(E e)462     public boolean add(E e) {
463         return offer(e);
464     }
465 
466     /**
467      * Inserts the specified element into this priority queue.
468      * As the queue is unbounded, this method will never return {@code false}.
469      *
470      * @param e the element to add
471      * @return {@code true} (as specified by {@link Queue#offer})
472      * @throws ClassCastException if the specified element cannot be compared
473      *         with elements currently in the priority queue according to the
474      *         priority queue's ordering
475      * @throws NullPointerException if the specified element is null
476      */
offer(E e)477     public boolean offer(E e) {
478         if (e == null)
479             throw new NullPointerException();
480         final ReentrantLock lock = this.lock;
481         lock.lock();
482         int n, cap;
483         Object[] es;
484         while ((n = size) >= (cap = (es = queue).length))
485             tryGrow(es, cap);
486         try {
487             final Comparator<? super E> cmp;
488             if ((cmp = comparator) == null)
489                 siftUpComparable(n, e, es);
490             else
491                 siftUpUsingComparator(n, e, es, cmp);
492             size = n + 1;
493             notEmpty.signal();
494         } finally {
495             lock.unlock();
496         }
497         return true;
498     }
499 
500     /**
501      * Inserts the specified element into this priority queue.
502      * As the queue is unbounded, this method will never block.
503      *
504      * @param e the element to add
505      * @throws ClassCastException if the specified element cannot be compared
506      *         with elements currently in the priority queue according to the
507      *         priority queue's ordering
508      * @throws NullPointerException if the specified element is null
509      */
put(E e)510     public void put(E e) {
511         offer(e); // never need to block
512     }
513 
514     /**
515      * Inserts the specified element into this priority queue.
516      * As the queue is unbounded, this method will never block or
517      * return {@code false}.
518      *
519      * @param e the element to add
520      * @param timeout This parameter is ignored as the method never blocks
521      * @param unit This parameter is ignored as the method never blocks
522      * @return {@code true} (as specified by
523      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
524      * @throws ClassCastException if the specified element cannot be compared
525      *         with elements currently in the priority queue according to the
526      *         priority queue's ordering
527      * @throws NullPointerException if the specified element is null
528      */
offer(E e, long timeout, TimeUnit unit)529     public boolean offer(E e, long timeout, TimeUnit unit) {
530         return offer(e); // never need to block
531     }
532 
poll()533     public E poll() {
534         final ReentrantLock lock = this.lock;
535         lock.lock();
536         try {
537             return dequeue();
538         } finally {
539             lock.unlock();
540         }
541     }
542 
take()543     public E take() throws InterruptedException {
544         final ReentrantLock lock = this.lock;
545         lock.lockInterruptibly();
546         E result;
547         try {
548             while ( (result = dequeue()) == null)
549                 notEmpty.await();
550         } finally {
551             lock.unlock();
552         }
553         return result;
554     }
555 
poll(long timeout, TimeUnit unit)556     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
557         long nanos = unit.toNanos(timeout);
558         final ReentrantLock lock = this.lock;
559         lock.lockInterruptibly();
560         E result;
561         try {
562             while ( (result = dequeue()) == null && nanos > 0)
563                 nanos = notEmpty.awaitNanos(nanos);
564         } finally {
565             lock.unlock();
566         }
567         return result;
568     }
569 
peek()570     public E peek() {
571         final ReentrantLock lock = this.lock;
572         lock.lock();
573         try {
574             return (E) queue[0];
575         } finally {
576             lock.unlock();
577         }
578     }
579 
580     /**
581      * Returns the comparator used to order the elements in this queue,
582      * or {@code null} if this queue uses the {@linkplain Comparable
583      * natural ordering} of its elements.
584      *
585      * @return the comparator used to order the elements in this queue,
586      *         or {@code null} if this queue uses the natural
587      *         ordering of its elements
588      */
comparator()589     public Comparator<? super E> comparator() {
590         return comparator;
591     }
592 
size()593     public int size() {
594         final ReentrantLock lock = this.lock;
595         lock.lock();
596         try {
597             return size;
598         } finally {
599             lock.unlock();
600         }
601     }
602 
603     /**
604      * Always returns {@code Integer.MAX_VALUE} because
605      * a {@code PriorityBlockingQueue} is not capacity constrained.
606      * @return {@code Integer.MAX_VALUE} always
607      */
remainingCapacity()608     public int remainingCapacity() {
609         return Integer.MAX_VALUE;
610     }
611 
indexOf(Object o)612     private int indexOf(Object o) {
613         if (o != null) {
614             final Object[] es = queue;
615             for (int i = 0, n = size; i < n; i++)
616                 if (o.equals(es[i]))
617                     return i;
618         }
619         return -1;
620     }
621 
622     /**
623      * Removes the ith element from queue.
624      */
removeAt(int i)625     private void removeAt(int i) {
626         final Object[] es = queue;
627         final int n = size - 1;
628         if (n == i) // removed last element
629             es[i] = null;
630         else {
631             E moved = (E) es[n];
632             es[n] = null;
633             final Comparator<? super E> cmp;
634             if ((cmp = comparator) == null)
635                 siftDownComparable(i, moved, es, n);
636             else
637                 siftDownUsingComparator(i, moved, es, n, cmp);
638             if (es[i] == moved) {
639                 if (cmp == null)
640                     siftUpComparable(i, moved, es);
641                 else
642                     siftUpUsingComparator(i, moved, es, cmp);
643             }
644         }
645         size = n;
646     }
647 
648     /**
649      * Removes a single instance of the specified element from this queue,
650      * if it is present.  More formally, removes an element {@code e} such
651      * that {@code o.equals(e)}, if this queue contains one or more such
652      * elements.  Returns {@code true} if and only if this queue contained
653      * the specified element (or equivalently, if this queue changed as a
654      * result of the call).
655      *
656      * @param o element to be removed from this queue, if present
657      * @return {@code true} if this queue changed as a result of the call
658      */
remove(Object o)659     public boolean remove(Object o) {
660         final ReentrantLock lock = this.lock;
661         lock.lock();
662         try {
663             int i = indexOf(o);
664             if (i == -1)
665                 return false;
666             removeAt(i);
667             return true;
668         } finally {
669             lock.unlock();
670         }
671     }
672 
673     /**
674      * Identity-based version for use in Itr.remove.
675      *
676      * @param o element to be removed from this queue, if present
677      */
removeEq(Object o)678     void removeEq(Object o) {
679         final ReentrantLock lock = this.lock;
680         lock.lock();
681         try {
682             final Object[] es = queue;
683             for (int i = 0, n = size; i < n; i++) {
684                 if (o == es[i]) {
685                     removeAt(i);
686                     break;
687                 }
688             }
689         } finally {
690             lock.unlock();
691         }
692     }
693 
694     /**
695      * Returns {@code true} if this queue contains the specified element.
696      * More formally, returns {@code true} if and only if this queue contains
697      * at least one element {@code e} such that {@code o.equals(e)}.
698      *
699      * @param o object to be checked for containment in this queue
700      * @return {@code true} if this queue contains the specified element
701      */
contains(Object o)702     public boolean contains(Object o) {
703         final ReentrantLock lock = this.lock;
704         lock.lock();
705         try {
706             return indexOf(o) != -1;
707         } finally {
708             lock.unlock();
709         }
710     }
711 
toString()712     public String toString() {
713         return Helpers.collectionToString(this);
714     }
715 
716     /**
717      * @throws UnsupportedOperationException {@inheritDoc}
718      * @throws ClassCastException            {@inheritDoc}
719      * @throws NullPointerException          {@inheritDoc}
720      * @throws IllegalArgumentException      {@inheritDoc}
721      */
drainTo(Collection<? super E> c)722     public int drainTo(Collection<? super E> c) {
723         return drainTo(c, Integer.MAX_VALUE);
724     }
725 
726     /**
727      * @throws UnsupportedOperationException {@inheritDoc}
728      * @throws ClassCastException            {@inheritDoc}
729      * @throws NullPointerException          {@inheritDoc}
730      * @throws IllegalArgumentException      {@inheritDoc}
731      */
drainTo(Collection<? super E> c, int maxElements)732     public int drainTo(Collection<? super E> c, int maxElements) {
733         Objects.requireNonNull(c);
734         if (c == this)
735             throw new IllegalArgumentException();
736         if (maxElements <= 0)
737             return 0;
738         final ReentrantLock lock = this.lock;
739         lock.lock();
740         try {
741             int n = Math.min(size, maxElements);
742             for (int i = 0; i < n; i++) {
743                 c.add((E) queue[0]); // In this order, in case add() throws.
744                 dequeue();
745             }
746             return n;
747         } finally {
748             lock.unlock();
749         }
750     }
751 
752     /**
753      * Atomically removes all of the elements from this queue.
754      * The queue will be empty after this call returns.
755      */
clear()756     public void clear() {
757         final ReentrantLock lock = this.lock;
758         lock.lock();
759         try {
760             final Object[] es = queue;
761             for (int i = 0, n = size; i < n; i++)
762                 es[i] = null;
763             size = 0;
764         } finally {
765             lock.unlock();
766         }
767     }
768 
769     /**
770      * Returns an array containing all of the elements in this queue.
771      * The returned array elements are in no particular order.
772      *
773      * <p>The returned array will be "safe" in that no references to it are
774      * maintained by this queue.  (In other words, this method must allocate
775      * a new array).  The caller is thus free to modify the returned array.
776      *
777      * <p>This method acts as bridge between array-based and collection-based
778      * APIs.
779      *
780      * @return an array containing all of the elements in this queue
781      */
toArray()782     public Object[] toArray() {
783         final ReentrantLock lock = this.lock;
784         lock.lock();
785         try {
786             return Arrays.copyOf(queue, size);
787         } finally {
788             lock.unlock();
789         }
790     }
791 
792     /**
793      * Returns an array containing all of the elements in this queue; the
794      * runtime type of the returned array is that of the specified array.
795      * The returned array elements are in no particular order.
796      * If the queue fits in the specified array, it is returned therein.
797      * Otherwise, a new array is allocated with the runtime type of the
798      * specified array and the size of this queue.
799      *
800      * <p>If this queue fits in the specified array with room to spare
801      * (i.e., the array has more elements than this queue), the element in
802      * the array immediately following the end of the queue is set to
803      * {@code null}.
804      *
805      * <p>Like the {@link #toArray()} method, this method acts as bridge between
806      * array-based and collection-based APIs.  Further, this method allows
807      * precise control over the runtime type of the output array, and may,
808      * under certain circumstances, be used to save allocation costs.
809      *
810      * <p>Suppose {@code x} is a queue known to contain only strings.
811      * The following code can be used to dump the queue into a newly
812      * allocated array of {@code String}:
813      *
814      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
815      *
816      * Note that {@code toArray(new Object[0])} is identical in function to
817      * {@code toArray()}.
818      *
819      * @param a the array into which the elements of the queue are to
820      *          be stored, if it is big enough; otherwise, a new array of the
821      *          same runtime type is allocated for this purpose
822      * @return an array containing all of the elements in this queue
823      * @throws ArrayStoreException if the runtime type of the specified array
824      *         is not a supertype of the runtime type of every element in
825      *         this queue
826      * @throws NullPointerException if the specified array is null
827      */
toArray(T[] a)828     public <T> T[] toArray(T[] a) {
829         final ReentrantLock lock = this.lock;
830         lock.lock();
831         try {
832             int n = size;
833             if (a.length < n)
834                 // Make a new array of a's runtime type, but my contents:
835                 return (T[]) Arrays.copyOf(queue, size, a.getClass());
836             System.arraycopy(queue, 0, a, 0, n);
837             if (a.length > n)
838                 a[n] = null;
839             return a;
840         } finally {
841             lock.unlock();
842         }
843     }
844 
845     /**
846      * Returns an iterator over the elements in this queue. The
847      * iterator does not return the elements in any particular order.
848      *
849      * <p>The returned iterator is
850      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
851      *
852      * @return an iterator over the elements in this queue
853      */
iterator()854     public Iterator<E> iterator() {
855         return new Itr(toArray());
856     }
857 
858     /**
859      * Snapshot iterator that works off copy of underlying q array.
860      */
861     final class Itr implements Iterator<E> {
862         final Object[] array; // Array of all elements
863         int cursor;           // index of next element to return
864         int lastRet = -1;     // index of last element, or -1 if no such
865 
Itr(Object[] array)866         Itr(Object[] array) {
867             this.array = array;
868         }
869 
hasNext()870         public boolean hasNext() {
871             return cursor < array.length;
872         }
873 
next()874         public E next() {
875             if (cursor >= array.length)
876                 throw new NoSuchElementException();
877             return (E)array[lastRet = cursor++];
878         }
879 
remove()880         public void remove() {
881             if (lastRet < 0)
882                 throw new IllegalStateException();
883             removeEq(array[lastRet]);
884             lastRet = -1;
885         }
886 
forEachRemaining(Consumer<? super E> action)887         public void forEachRemaining(Consumer<? super E> action) {
888             Objects.requireNonNull(action);
889             final Object[] es = array;
890             int i;
891             if ((i = cursor) < es.length) {
892                 lastRet = -1;
893                 cursor = es.length;
894                 for (; i < es.length; i++)
895                     action.accept((E) es[i]);
896                 lastRet = es.length - 1;
897             }
898         }
899     }
900 
901     /**
902      * Saves this queue to a stream (that is, serializes it).
903      *
904      * For compatibility with previous version of this class, elements
905      * are first copied to a java.util.PriorityQueue, which is then
906      * serialized.
907      *
908      * @param s the stream
909      * @throws java.io.IOException if an I/O error occurs
910      */
writeObject(java.io.ObjectOutputStream s)911     private void writeObject(java.io.ObjectOutputStream s)
912         throws java.io.IOException {
913         lock.lock();
914         try {
915             // avoid zero capacity argument
916             q = new PriorityQueue<E>(Math.max(size, 1), comparator);
917             q.addAll(this);
918             s.defaultWriteObject();
919         } finally {
920             q = null;
921             lock.unlock();
922         }
923     }
924 
925     /**
926      * Reconstitutes this queue from a stream (that is, deserializes it).
927      * @param s the stream
928      * @throws ClassNotFoundException if the class of a serialized object
929      *         could not be found
930      * @throws java.io.IOException if an I/O error occurs
931      */
readObject(java.io.ObjectInputStream s)932     private void readObject(java.io.ObjectInputStream s)
933         throws java.io.IOException, ClassNotFoundException {
934         try {
935             s.defaultReadObject();
936             int sz = q.size();
937             SharedSecrets.getJavaObjectInputStreamAccess().checkArray(s, Object[].class, sz);
938             this.queue = new Object[Math.max(1, sz)];
939             comparator = q.comparator();
940             addAll(q);
941         } finally {
942             q = null;
943         }
944     }
945 
946     /**
947      * Immutable snapshot spliterator that binds to elements "late".
948      */
949     final class PBQSpliterator implements Spliterator<E> {
950         Object[] array;        // null until late-bound-initialized
951         int index;
952         int fence;
953 
PBQSpliterator()954         PBQSpliterator() {}
955 
PBQSpliterator(Object[] array, int index, int fence)956         PBQSpliterator(Object[] array, int index, int fence) {
957             this.array = array;
958             this.index = index;
959             this.fence = fence;
960         }
961 
getFence()962         private int getFence() {
963             if (array == null)
964                 fence = (array = toArray()).length;
965             return fence;
966         }
967 
trySplit()968         public PBQSpliterator trySplit() {
969             int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
970             return (lo >= mid) ? null :
971                 new PBQSpliterator(array, lo, index = mid);
972         }
973 
forEachRemaining(Consumer<? super E> action)974         public void forEachRemaining(Consumer<? super E> action) {
975             Objects.requireNonNull(action);
976             final int hi = getFence(), lo = index;
977             final Object[] es = array;
978             index = hi;                 // ensure exhaustion
979             for (int i = lo; i < hi; i++)
980                 action.accept((E) es[i]);
981         }
982 
tryAdvance(Consumer<? super E> action)983         public boolean tryAdvance(Consumer<? super E> action) {
984             Objects.requireNonNull(action);
985             if (getFence() > index && index >= 0) {
986                 action.accept((E) array[index++]);
987                 return true;
988             }
989             return false;
990         }
991 
estimateSize()992         public long estimateSize() { return getFence() - index; }
993 
characteristics()994         public int characteristics() {
995             return (Spliterator.NONNULL |
996                     Spliterator.SIZED |
997                     Spliterator.SUBSIZED);
998         }
999     }
1000 
1001     /**
1002      * Returns a {@link Spliterator} over the elements in this queue.
1003      * The spliterator does not traverse elements in any particular order
1004      * (the {@link Spliterator#ORDERED ORDERED} characteristic is not reported).
1005      *
1006      * <p>The returned spliterator is
1007      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1008      *
1009      * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
1010      * {@link Spliterator#NONNULL}.
1011      *
1012      * @implNote
1013      * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
1014      *
1015      * @return a {@code Spliterator} over the elements in this queue
1016      * @since 1.8
1017      */
spliterator()1018     public Spliterator<E> spliterator() {
1019         return new PBQSpliterator();
1020     }
1021 
1022     /**
1023      * @throws NullPointerException {@inheritDoc}
1024      */
removeIf(Predicate<? super E> filter)1025     public boolean removeIf(Predicate<? super E> filter) {
1026         Objects.requireNonNull(filter);
1027         return bulkRemove(filter);
1028     }
1029 
1030     /**
1031      * @throws NullPointerException {@inheritDoc}
1032      */
removeAll(Collection<?> c)1033     public boolean removeAll(Collection<?> c) {
1034         Objects.requireNonNull(c);
1035         return bulkRemove(e -> c.contains(e));
1036     }
1037 
1038     /**
1039      * @throws NullPointerException {@inheritDoc}
1040      */
retainAll(Collection<?> c)1041     public boolean retainAll(Collection<?> c) {
1042         Objects.requireNonNull(c);
1043         return bulkRemove(e -> !c.contains(e));
1044     }
1045 
1046     // A tiny bit set implementation
1047 
nBits(int n)1048     private static long[] nBits(int n) {
1049         return new long[((n - 1) >> 6) + 1];
1050     }
setBit(long[] bits, int i)1051     private static void setBit(long[] bits, int i) {
1052         bits[i >> 6] |= 1L << i;
1053     }
isClear(long[] bits, int i)1054     private static boolean isClear(long[] bits, int i) {
1055         return (bits[i >> 6] & (1L << i)) == 0;
1056     }
1057 
1058     /** Implementation of bulk remove methods. */
bulkRemove(Predicate<? super E> filter)1059     private boolean bulkRemove(Predicate<? super E> filter) {
1060         final ReentrantLock lock = this.lock;
1061         lock.lock();
1062         try {
1063             final Object[] es = queue;
1064             final int end = size;
1065             int i;
1066             // Optimize for initial run of survivors
1067             for (i = 0; i < end && !filter.test((E) es[i]); i++)
1068                 ;
1069             if (i >= end)
1070                 return false;
1071             // Tolerate predicates that reentrantly access the
1072             // collection for read, so traverse once to find elements
1073             // to delete, a second pass to physically expunge.
1074             final int beg = i;
1075             final long[] deathRow = nBits(end - beg);
1076             deathRow[0] = 1L;   // set bit 0
1077             for (i = beg + 1; i < end; i++)
1078                 if (filter.test((E) es[i]))
1079                     setBit(deathRow, i - beg);
1080             int w = beg;
1081             for (i = beg; i < end; i++)
1082                 if (isClear(deathRow, i - beg))
1083                     es[w++] = es[i];
1084             for (i = size = w; i < end; i++)
1085                 es[i] = null;
1086             heapify();
1087             return true;
1088         } finally {
1089             lock.unlock();
1090         }
1091     }
1092 
1093     /**
1094      * @throws NullPointerException {@inheritDoc}
1095      */
forEach(Consumer<? super E> action)1096     public void forEach(Consumer<? super E> action) {
1097         Objects.requireNonNull(action);
1098         final ReentrantLock lock = this.lock;
1099         lock.lock();
1100         try {
1101             final Object[] es = queue;
1102             for (int i = 0, n = size; i < n; i++)
1103                 action.accept((E) es[i]);
1104         } finally {
1105             lock.unlock();
1106         }
1107     }
1108 
1109     // VarHandle mechanics
1110     private static final VarHandle ALLOCATIONSPINLOCK;
1111     static {
1112         try {
1113             MethodHandles.Lookup l = MethodHandles.lookup();
1114             ALLOCATIONSPINLOCK = l.findVarHandle(PriorityBlockingQueue.class,
1115                                                  "allocationSpinLock",
1116                                                  int.class);
1117         } catch (ReflectiveOperationException e) {
1118             throw new ExceptionInInitializerError(e);
1119         }
1120     }
1121 }
1122