1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.AbstractQueue; 39 import java.util.Collection; 40 import java.util.Iterator; 41 import java.util.NoSuchElementException; 42 import java.util.Objects; 43 import java.util.Spliterator; 44 import java.util.Spliterators; 45 import java.util.concurrent.atomic.AtomicInteger; 46 import java.util.concurrent.locks.Condition; 47 import java.util.concurrent.locks.ReentrantLock; 48 import java.util.function.Consumer; 49 import java.util.function.Predicate; 50 51 /** 52 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on 53 * linked nodes. 54 * This queue orders elements FIFO (first-in-first-out). 55 * The <em>head</em> of the queue is that element that has been on the 56 * queue the longest time. 57 * The <em>tail</em> of the queue is that element that has been on the 58 * queue the shortest time. New elements 59 * are inserted at the tail of the queue, and the queue retrieval 60 * operations obtain elements at the head of the queue. 61 * Linked queues typically have higher throughput than array-based queues but 62 * less predictable performance in most concurrent applications. 63 * 64 * <p>The optional capacity bound constructor argument serves as a 65 * way to prevent excessive queue expansion. The capacity, if unspecified, 66 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are 67 * dynamically created upon each insertion unless this would bring the 68 * queue above capacity. 69 * 70 * <p>This class and its iterator implement all of the <em>optional</em> 71 * methods of the {@link Collection} and {@link Iterator} interfaces. 72 * 73 * <p>This class is a member of the 74 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework"> 75 * Java Collections Framework</a>. 76 * 77 * @since 1.5 78 * @author Doug Lea 79 * @param <E> the type of elements held in this queue 80 */ 81 public class LinkedBlockingQueue<E> extends AbstractQueue<E> 82 implements BlockingQueue<E>, java.io.Serializable { 83 private static final long serialVersionUID = -6903933977591709194L; 84 85 /* 86 * A variant of the "two lock queue" algorithm. The putLock gates 87 * entry to put (and offer), and has an associated condition for 88 * waiting puts. Similarly for the takeLock. The "count" field 89 * that they both rely on is maintained as an atomic to avoid 90 * needing to get both locks in most cases. Also, to minimize need 91 * for puts to get takeLock and vice-versa, cascading notifies are 92 * used. When a put notices that it has enabled at least one take, 93 * it signals taker. That taker in turn signals others if more 94 * items have been entered since the signal. And symmetrically for 95 * takes signalling puts. Operations such as remove(Object) and 96 * iterators acquire both locks. 97 * 98 * Visibility between writers and readers is provided as follows: 99 * 100 * Whenever an element is enqueued, the putLock is acquired and 101 * count updated. A subsequent reader guarantees visibility to the 102 * enqueued Node by either acquiring the putLock (via fullyLock) 103 * or by acquiring the takeLock, and then reading n = count.get(); 104 * this gives visibility to the first n items. 105 * 106 * To implement weakly consistent iterators, it appears we need to 107 * keep all Nodes GC-reachable from a predecessor dequeued Node. 108 * That would cause two problems: 109 * - allow a rogue Iterator to cause unbounded memory retention 110 * - cause cross-generational linking of old Nodes to new Nodes if 111 * a Node was tenured while live, which generational GCs have a 112 * hard time dealing with, causing repeated major collections. 113 * However, only non-deleted Nodes need to be reachable from 114 * dequeued Nodes, and reachability does not necessarily have to 115 * be of the kind understood by the GC. We use the trick of 116 * linking a Node that has just been dequeued to itself. Such a 117 * self-link implicitly means to advance to head.next. 118 */ 119 120 /** 121 * Linked list node class. 122 */ 123 static class Node<E> { 124 E item; 125 126 /** 127 * One of: 128 * - the real successor Node 129 * - this Node, meaning the successor is head.next 130 * - null, meaning there is no successor (this is the last node) 131 */ 132 Node<E> next; 133 Node(E x)134 Node(E x) { item = x; } 135 } 136 137 /** The capacity bound, or Integer.MAX_VALUE if none */ 138 private final int capacity; 139 140 /** Current number of elements */ 141 private final AtomicInteger count = new AtomicInteger(); 142 143 /** 144 * Head of linked list. 145 * Invariant: head.item == null 146 */ 147 transient Node<E> head; 148 149 /** 150 * Tail of linked list. 151 * Invariant: last.next == null 152 */ 153 private transient Node<E> last; 154 155 /** Lock held by take, poll, etc */ 156 private final ReentrantLock takeLock = new ReentrantLock(); 157 158 /** Wait queue for waiting takes */ 159 private final Condition notEmpty = takeLock.newCondition(); 160 161 /** Lock held by put, offer, etc */ 162 private final ReentrantLock putLock = new ReentrantLock(); 163 164 /** Wait queue for waiting puts */ 165 private final Condition notFull = putLock.newCondition(); 166 167 /** 168 * Signals a waiting take. Called only from put/offer (which do not 169 * otherwise ordinarily lock takeLock.) 170 */ signalNotEmpty()171 private void signalNotEmpty() { 172 final ReentrantLock takeLock = this.takeLock; 173 takeLock.lock(); 174 try { 175 notEmpty.signal(); 176 } finally { 177 takeLock.unlock(); 178 } 179 } 180 181 /** 182 * Signals a waiting put. Called only from take/poll. 183 */ signalNotFull()184 private void signalNotFull() { 185 final ReentrantLock putLock = this.putLock; 186 putLock.lock(); 187 try { 188 notFull.signal(); 189 } finally { 190 putLock.unlock(); 191 } 192 } 193 194 /** 195 * Links node at end of queue. 196 * 197 * @param node the node 198 */ enqueue(Node<E> node)199 private void enqueue(Node<E> node) { 200 // assert putLock.isHeldByCurrentThread(); 201 // assert last.next == null; 202 last = last.next = node; 203 } 204 205 /** 206 * Removes a node from head of queue. 207 * 208 * @return the node 209 */ dequeue()210 private E dequeue() { 211 // assert takeLock.isHeldByCurrentThread(); 212 // assert head.item == null; 213 Node<E> h = head; 214 Node<E> first = h.next; 215 h.next = h; // help GC 216 head = first; 217 E x = first.item; 218 first.item = null; 219 return x; 220 } 221 222 /** 223 * Locks to prevent both puts and takes. 224 */ fullyLock()225 void fullyLock() { 226 putLock.lock(); 227 takeLock.lock(); 228 } 229 230 /** 231 * Unlocks to allow both puts and takes. 232 */ fullyUnlock()233 void fullyUnlock() { 234 takeLock.unlock(); 235 putLock.unlock(); 236 } 237 238 /** 239 * Creates a {@code LinkedBlockingQueue} with a capacity of 240 * {@link Integer#MAX_VALUE}. 241 */ LinkedBlockingQueue()242 public LinkedBlockingQueue() { 243 this(Integer.MAX_VALUE); 244 } 245 246 /** 247 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. 248 * 249 * @param capacity the capacity of this queue 250 * @throws IllegalArgumentException if {@code capacity} is not greater 251 * than zero 252 */ LinkedBlockingQueue(int capacity)253 public LinkedBlockingQueue(int capacity) { 254 if (capacity <= 0) throw new IllegalArgumentException(); 255 this.capacity = capacity; 256 last = head = new Node<E>(null); 257 } 258 259 /** 260 * Creates a {@code LinkedBlockingQueue} with a capacity of 261 * {@link Integer#MAX_VALUE}, initially containing the elements of the 262 * given collection, 263 * added in traversal order of the collection's iterator. 264 * 265 * @param c the collection of elements to initially contain 266 * @throws NullPointerException if the specified collection or any 267 * of its elements are null 268 */ LinkedBlockingQueue(Collection<? extends E> c)269 public LinkedBlockingQueue(Collection<? extends E> c) { 270 this(Integer.MAX_VALUE); 271 final ReentrantLock putLock = this.putLock; 272 putLock.lock(); // Never contended, but necessary for visibility 273 try { 274 int n = 0; 275 for (E e : c) { 276 if (e == null) 277 throw new NullPointerException(); 278 if (n == capacity) 279 throw new IllegalStateException("Queue full"); 280 enqueue(new Node<E>(e)); 281 ++n; 282 } 283 count.set(n); 284 } finally { 285 putLock.unlock(); 286 } 287 } 288 289 // this doc comment is overridden to remove the reference to collections 290 // greater in size than Integer.MAX_VALUE 291 /** 292 * Returns the number of elements in this queue. 293 * 294 * @return the number of elements in this queue 295 */ size()296 public int size() { 297 return count.get(); 298 } 299 300 // this doc comment is a modified copy of the inherited doc comment, 301 // without the reference to unlimited queues. 302 /** 303 * Returns the number of additional elements that this queue can ideally 304 * (in the absence of memory or resource constraints) accept without 305 * blocking. This is always equal to the initial capacity of this queue 306 * less the current {@code size} of this queue. 307 * 308 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 309 * an element will succeed by inspecting {@code remainingCapacity} 310 * because it may be the case that another thread is about to 311 * insert or remove an element. 312 */ remainingCapacity()313 public int remainingCapacity() { 314 return capacity - count.get(); 315 } 316 317 /** 318 * Inserts the specified element at the tail of this queue, waiting if 319 * necessary for space to become available. 320 * 321 * @throws InterruptedException {@inheritDoc} 322 * @throws NullPointerException {@inheritDoc} 323 */ put(E e)324 public void put(E e) throws InterruptedException { 325 if (e == null) throw new NullPointerException(); 326 final int c; 327 final Node<E> node = new Node<E>(e); 328 final ReentrantLock putLock = this.putLock; 329 final AtomicInteger count = this.count; 330 putLock.lockInterruptibly(); 331 try { 332 /* 333 * Note that count is used in wait guard even though it is 334 * not protected by lock. This works because count can 335 * only decrease at this point (all other puts are shut 336 * out by lock), and we (or some other waiting put) are 337 * signalled if it ever changes from capacity. Similarly 338 * for all other uses of count in other wait guards. 339 */ 340 while (count.get() == capacity) { 341 notFull.await(); 342 } 343 enqueue(node); 344 c = count.getAndIncrement(); 345 if (c + 1 < capacity) 346 notFull.signal(); 347 } finally { 348 putLock.unlock(); 349 } 350 if (c == 0) 351 signalNotEmpty(); 352 } 353 354 /** 355 * Inserts the specified element at the tail of this queue, waiting if 356 * necessary up to the specified wait time for space to become available. 357 * 358 * @return {@code true} if successful, or {@code false} if 359 * the specified waiting time elapses before space is available 360 * @throws InterruptedException {@inheritDoc} 361 * @throws NullPointerException {@inheritDoc} 362 */ offer(E e, long timeout, TimeUnit unit)363 public boolean offer(E e, long timeout, TimeUnit unit) 364 throws InterruptedException { 365 366 if (e == null) throw new NullPointerException(); 367 long nanos = unit.toNanos(timeout); 368 final int c; 369 final ReentrantLock putLock = this.putLock; 370 final AtomicInteger count = this.count; 371 putLock.lockInterruptibly(); 372 try { 373 while (count.get() == capacity) { 374 if (nanos <= 0L) 375 return false; 376 nanos = notFull.awaitNanos(nanos); 377 } 378 enqueue(new Node<E>(e)); 379 c = count.getAndIncrement(); 380 if (c + 1 < capacity) 381 notFull.signal(); 382 } finally { 383 putLock.unlock(); 384 } 385 if (c == 0) 386 signalNotEmpty(); 387 return true; 388 } 389 390 /** 391 * Inserts the specified element at the tail of this queue if it is 392 * possible to do so immediately without exceeding the queue's capacity, 393 * returning {@code true} upon success and {@code false} if this queue 394 * is full. 395 * When using a capacity-restricted queue, this method is generally 396 * preferable to method {@link BlockingQueue#add add}, which can fail to 397 * insert an element only by throwing an exception. 398 * 399 * @throws NullPointerException if the specified element is null 400 */ offer(E e)401 public boolean offer(E e) { 402 if (e == null) throw new NullPointerException(); 403 final AtomicInteger count = this.count; 404 if (count.get() == capacity) 405 return false; 406 final int c; 407 final Node<E> node = new Node<E>(e); 408 final ReentrantLock putLock = this.putLock; 409 putLock.lock(); 410 try { 411 if (count.get() == capacity) 412 return false; 413 enqueue(node); 414 c = count.getAndIncrement(); 415 if (c + 1 < capacity) 416 notFull.signal(); 417 } finally { 418 putLock.unlock(); 419 } 420 if (c == 0) 421 signalNotEmpty(); 422 return true; 423 } 424 take()425 public E take() throws InterruptedException { 426 final E x; 427 final int c; 428 final AtomicInteger count = this.count; 429 final ReentrantLock takeLock = this.takeLock; 430 takeLock.lockInterruptibly(); 431 try { 432 while (count.get() == 0) { 433 notEmpty.await(); 434 } 435 x = dequeue(); 436 c = count.getAndDecrement(); 437 if (c > 1) 438 notEmpty.signal(); 439 } finally { 440 takeLock.unlock(); 441 } 442 if (c == capacity) 443 signalNotFull(); 444 return x; 445 } 446 poll(long timeout, TimeUnit unit)447 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 448 final E x; 449 final int c; 450 long nanos = unit.toNanos(timeout); 451 final AtomicInteger count = this.count; 452 final ReentrantLock takeLock = this.takeLock; 453 takeLock.lockInterruptibly(); 454 try { 455 while (count.get() == 0) { 456 if (nanos <= 0L) 457 return null; 458 nanos = notEmpty.awaitNanos(nanos); 459 } 460 x = dequeue(); 461 c = count.getAndDecrement(); 462 if (c > 1) 463 notEmpty.signal(); 464 } finally { 465 takeLock.unlock(); 466 } 467 if (c == capacity) 468 signalNotFull(); 469 return x; 470 } 471 poll()472 public E poll() { 473 final AtomicInteger count = this.count; 474 if (count.get() == 0) 475 return null; 476 final E x; 477 final int c; 478 final ReentrantLock takeLock = this.takeLock; 479 takeLock.lock(); 480 try { 481 if (count.get() == 0) 482 return null; 483 x = dequeue(); 484 c = count.getAndDecrement(); 485 if (c > 1) 486 notEmpty.signal(); 487 } finally { 488 takeLock.unlock(); 489 } 490 if (c == capacity) 491 signalNotFull(); 492 return x; 493 } 494 peek()495 public E peek() { 496 final AtomicInteger count = this.count; 497 if (count.get() == 0) 498 return null; 499 final ReentrantLock takeLock = this.takeLock; 500 takeLock.lock(); 501 try { 502 return (count.get() > 0) ? head.next.item : null; 503 } finally { 504 takeLock.unlock(); 505 } 506 } 507 508 /** 509 * Unlinks interior Node p with predecessor pred. 510 */ unlink(Node<E> p, Node<E> pred)511 void unlink(Node<E> p, Node<E> pred) { 512 // assert putLock.isHeldByCurrentThread(); 513 // assert takeLock.isHeldByCurrentThread(); 514 // p.next is not changed, to allow iterators that are 515 // traversing p to maintain their weak-consistency guarantee. 516 p.item = null; 517 pred.next = p.next; 518 if (last == p) 519 last = pred; 520 if (count.getAndDecrement() == capacity) 521 notFull.signal(); 522 } 523 524 /** 525 * Removes a single instance of the specified element from this queue, 526 * if it is present. More formally, removes an element {@code e} such 527 * that {@code o.equals(e)}, if this queue contains one or more such 528 * elements. 529 * Returns {@code true} if this queue contained the specified element 530 * (or equivalently, if this queue changed as a result of the call). 531 * 532 * @param o element to be removed from this queue, if present 533 * @return {@code true} if this queue changed as a result of the call 534 */ remove(Object o)535 public boolean remove(Object o) { 536 if (o == null) return false; 537 fullyLock(); 538 try { 539 for (Node<E> pred = head, p = pred.next; 540 p != null; 541 pred = p, p = p.next) { 542 if (o.equals(p.item)) { 543 unlink(p, pred); 544 return true; 545 } 546 } 547 return false; 548 } finally { 549 fullyUnlock(); 550 } 551 } 552 553 /** 554 * Returns {@code true} if this queue contains the specified element. 555 * More formally, returns {@code true} if and only if this queue contains 556 * at least one element {@code e} such that {@code o.equals(e)}. 557 * 558 * @param o object to be checked for containment in this queue 559 * @return {@code true} if this queue contains the specified element 560 */ contains(Object o)561 public boolean contains(Object o) { 562 if (o == null) return false; 563 fullyLock(); 564 try { 565 for (Node<E> p = head.next; p != null; p = p.next) 566 if (o.equals(p.item)) 567 return true; 568 return false; 569 } finally { 570 fullyUnlock(); 571 } 572 } 573 574 /** 575 * Returns an array containing all of the elements in this queue, in 576 * proper sequence. 577 * 578 * <p>The returned array will be "safe" in that no references to it are 579 * maintained by this queue. (In other words, this method must allocate 580 * a new array). The caller is thus free to modify the returned array. 581 * 582 * <p>This method acts as bridge between array-based and collection-based 583 * APIs. 584 * 585 * @return an array containing all of the elements in this queue 586 */ toArray()587 public Object[] toArray() { 588 fullyLock(); 589 try { 590 int size = count.get(); 591 Object[] a = new Object[size]; 592 int k = 0; 593 for (Node<E> p = head.next; p != null; p = p.next) 594 a[k++] = p.item; 595 return a; 596 } finally { 597 fullyUnlock(); 598 } 599 } 600 601 /** 602 * Returns an array containing all of the elements in this queue, in 603 * proper sequence; the runtime type of the returned array is that of 604 * the specified array. If the queue fits in the specified array, it 605 * is returned therein. Otherwise, a new array is allocated with the 606 * runtime type of the specified array and the size of this queue. 607 * 608 * <p>If this queue fits in the specified array with room to spare 609 * (i.e., the array has more elements than this queue), the element in 610 * the array immediately following the end of the queue is set to 611 * {@code null}. 612 * 613 * <p>Like the {@link #toArray()} method, this method acts as bridge between 614 * array-based and collection-based APIs. Further, this method allows 615 * precise control over the runtime type of the output array, and may, 616 * under certain circumstances, be used to save allocation costs. 617 * 618 * <p>Suppose {@code x} is a queue known to contain only strings. 619 * The following code can be used to dump the queue into a newly 620 * allocated array of {@code String}: 621 * 622 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 623 * 624 * Note that {@code toArray(new Object[0])} is identical in function to 625 * {@code toArray()}. 626 * 627 * @param a the array into which the elements of the queue are to 628 * be stored, if it is big enough; otherwise, a new array of the 629 * same runtime type is allocated for this purpose 630 * @return an array containing all of the elements in this queue 631 * @throws ArrayStoreException if the runtime type of the specified array 632 * is not a supertype of the runtime type of every element in 633 * this queue 634 * @throws NullPointerException if the specified array is null 635 */ 636 @SuppressWarnings("unchecked") toArray(T[] a)637 public <T> T[] toArray(T[] a) { 638 fullyLock(); 639 try { 640 int size = count.get(); 641 if (a.length < size) 642 a = (T[])java.lang.reflect.Array.newInstance 643 (a.getClass().getComponentType(), size); 644 645 int k = 0; 646 for (Node<E> p = head.next; p != null; p = p.next) 647 a[k++] = (T)p.item; 648 if (a.length > k) 649 a[k] = null; 650 return a; 651 } finally { 652 fullyUnlock(); 653 } 654 } 655 toString()656 public String toString() { 657 return Helpers.collectionToString(this); 658 } 659 660 /** 661 * Atomically removes all of the elements from this queue. 662 * The queue will be empty after this call returns. 663 */ clear()664 public void clear() { 665 fullyLock(); 666 try { 667 for (Node<E> p, h = head; (p = h.next) != null; h = p) { 668 h.next = h; 669 p.item = null; 670 } 671 head = last; 672 // assert head.item == null && head.next == null; 673 if (count.getAndSet(0) == capacity) 674 notFull.signal(); 675 } finally { 676 fullyUnlock(); 677 } 678 } 679 680 /** 681 * @throws UnsupportedOperationException {@inheritDoc} 682 * @throws ClassCastException {@inheritDoc} 683 * @throws NullPointerException {@inheritDoc} 684 * @throws IllegalArgumentException {@inheritDoc} 685 */ drainTo(Collection<? super E> c)686 public int drainTo(Collection<? super E> c) { 687 return drainTo(c, Integer.MAX_VALUE); 688 } 689 690 /** 691 * @throws UnsupportedOperationException {@inheritDoc} 692 * @throws ClassCastException {@inheritDoc} 693 * @throws NullPointerException {@inheritDoc} 694 * @throws IllegalArgumentException {@inheritDoc} 695 */ drainTo(Collection<? super E> c, int maxElements)696 public int drainTo(Collection<? super E> c, int maxElements) { 697 Objects.requireNonNull(c); 698 if (c == this) 699 throw new IllegalArgumentException(); 700 if (maxElements <= 0) 701 return 0; 702 boolean signalNotFull = false; 703 final ReentrantLock takeLock = this.takeLock; 704 takeLock.lock(); 705 try { 706 int n = Math.min(maxElements, count.get()); 707 // count.get provides visibility to first n Nodes 708 Node<E> h = head; 709 int i = 0; 710 try { 711 while (i < n) { 712 Node<E> p = h.next; 713 c.add(p.item); 714 p.item = null; 715 h.next = h; 716 h = p; 717 ++i; 718 } 719 return n; 720 } finally { 721 // Restore invariants even if c.add() threw 722 if (i > 0) { 723 // assert h.item == null; 724 head = h; 725 signalNotFull = (count.getAndAdd(-i) == capacity); 726 } 727 } 728 } finally { 729 takeLock.unlock(); 730 if (signalNotFull) 731 signalNotFull(); 732 } 733 } 734 735 /** 736 * Used for any element traversal that is not entirely under lock. 737 * Such traversals must handle both: 738 * - dequeued nodes (p.next == p) 739 * - (possibly multiple) interior removed nodes (p.item == null) 740 */ succ(Node<E> p)741 Node<E> succ(Node<E> p) { 742 if (p == (p = p.next)) 743 p = head.next; 744 return p; 745 } 746 747 /** 748 * Returns an iterator over the elements in this queue in proper sequence. 749 * The elements will be returned in order from first (head) to last (tail). 750 * 751 * <p>The returned iterator is 752 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 753 * 754 * @return an iterator over the elements in this queue in proper sequence 755 */ iterator()756 public Iterator<E> iterator() { 757 return new Itr(); 758 } 759 760 /** 761 * Weakly-consistent iterator. 762 * 763 * Lazily updated ancestor field provides expected O(1) remove(), 764 * but still O(n) in the worst case, whenever the saved ancestor 765 * is concurrently deleted. 766 */ 767 private class Itr implements Iterator<E> { 768 private Node<E> next; // Node holding nextItem 769 private E nextItem; // next item to hand out 770 private Node<E> lastRet; 771 private Node<E> ancestor; // Helps unlink lastRet on remove() 772 Itr()773 Itr() { 774 fullyLock(); 775 try { 776 if ((next = head.next) != null) 777 nextItem = next.item; 778 } finally { 779 fullyUnlock(); 780 } 781 } 782 hasNext()783 public boolean hasNext() { 784 return next != null; 785 } 786 next()787 public E next() { 788 Node<E> p; 789 if ((p = next) == null) 790 throw new NoSuchElementException(); 791 lastRet = p; 792 E x = nextItem; 793 fullyLock(); 794 try { 795 E e = null; 796 for (p = p.next; p != null && (e = p.item) == null; ) 797 p = succ(p); 798 next = p; 799 nextItem = e; 800 } finally { 801 fullyUnlock(); 802 } 803 return x; 804 } 805 forEachRemaining(Consumer<? super E> action)806 public void forEachRemaining(Consumer<? super E> action) { 807 // A variant of forEachFrom 808 Objects.requireNonNull(action); 809 Node<E> p; 810 if ((p = next) == null) return; 811 lastRet = p; 812 next = null; 813 final int batchSize = 64; 814 Object[] es = null; 815 int n, len = 1; 816 do { 817 fullyLock(); 818 try { 819 if (es == null) { 820 p = p.next; 821 for (Node<E> q = p; q != null; q = succ(q)) 822 if (q.item != null && ++len == batchSize) 823 break; 824 es = new Object[len]; 825 es[0] = nextItem; 826 nextItem = null; 827 n = 1; 828 } else 829 n = 0; 830 for (; p != null && n < len; p = succ(p)) 831 if ((es[n] = p.item) != null) { 832 lastRet = p; 833 n++; 834 } 835 } finally { 836 fullyUnlock(); 837 } 838 for (int i = 0; i < n; i++) { 839 @SuppressWarnings("unchecked") E e = (E) es[i]; 840 action.accept(e); 841 } 842 } while (n > 0 && p != null); 843 } 844 remove()845 public void remove() { 846 Node<E> p = lastRet; 847 if (p == null) 848 throw new IllegalStateException(); 849 lastRet = null; 850 fullyLock(); 851 try { 852 if (p.item != null) { 853 if (ancestor == null) 854 ancestor = head; 855 ancestor = findPred(p, ancestor); 856 unlink(p, ancestor); 857 } 858 } finally { 859 fullyUnlock(); 860 } 861 } 862 } 863 864 /** 865 * A customized variant of Spliterators.IteratorSpliterator. 866 * Keep this class in sync with (very similar) LBDSpliterator. 867 */ 868 private final class LBQSpliterator implements Spliterator<E> { 869 static final int MAX_BATCH = 1 << 25; // max batch array size; 870 Node<E> current; // current node; null until initialized 871 int batch; // batch size for splits 872 boolean exhausted; // true when no more nodes 873 long est = size(); // size estimate 874 LBQSpliterator()875 LBQSpliterator() {} 876 estimateSize()877 public long estimateSize() { return est; } 878 trySplit()879 public Spliterator<E> trySplit() { 880 Node<E> h; 881 if (!exhausted && 882 ((h = current) != null || (h = head.next) != null) 883 && h.next != null) { 884 int n = batch = Math.min(batch + 1, MAX_BATCH); 885 Object[] a = new Object[n]; 886 int i = 0; 887 Node<E> p = current; 888 fullyLock(); 889 try { 890 if (p != null || (p = head.next) != null) 891 for (; p != null && i < n; p = succ(p)) 892 if ((a[i] = p.item) != null) 893 i++; 894 } finally { 895 fullyUnlock(); 896 } 897 if ((current = p) == null) { 898 est = 0L; 899 exhausted = true; 900 } 901 else if ((est -= i) < 0L) 902 est = 0L; 903 if (i > 0) 904 return Spliterators.spliterator 905 (a, 0, i, (Spliterator.ORDERED | 906 Spliterator.NONNULL | 907 Spliterator.CONCURRENT)); 908 } 909 return null; 910 } 911 tryAdvance(Consumer<? super E> action)912 public boolean tryAdvance(Consumer<? super E> action) { 913 Objects.requireNonNull(action); 914 if (!exhausted) { 915 E e = null; 916 fullyLock(); 917 try { 918 Node<E> p; 919 if ((p = current) != null || (p = head.next) != null) 920 do { 921 e = p.item; 922 p = succ(p); 923 } while (e == null && p != null); 924 if ((current = p) == null) 925 exhausted = true; 926 } finally { 927 fullyUnlock(); 928 } 929 if (e != null) { 930 action.accept(e); 931 return true; 932 } 933 } 934 return false; 935 } 936 forEachRemaining(Consumer<? super E> action)937 public void forEachRemaining(Consumer<? super E> action) { 938 Objects.requireNonNull(action); 939 if (!exhausted) { 940 exhausted = true; 941 Node<E> p = current; 942 current = null; 943 forEachFrom(action, p); 944 } 945 } 946 characteristics()947 public int characteristics() { 948 return (Spliterator.ORDERED | 949 Spliterator.NONNULL | 950 Spliterator.CONCURRENT); 951 } 952 } 953 954 /** 955 * Returns a {@link Spliterator} over the elements in this queue. 956 * 957 * <p>The returned spliterator is 958 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 959 * 960 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, 961 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. 962 * 963 * @implNote 964 * The {@code Spliterator} implements {@code trySplit} to permit limited 965 * parallelism. 966 * 967 * @return a {@code Spliterator} over the elements in this queue 968 * @since 1.8 969 */ spliterator()970 public Spliterator<E> spliterator() { 971 return new LBQSpliterator(); 972 } 973 974 /** 975 * @throws NullPointerException {@inheritDoc} 976 */ forEach(Consumer<? super E> action)977 public void forEach(Consumer<? super E> action) { 978 Objects.requireNonNull(action); 979 forEachFrom(action, null); 980 } 981 982 /** 983 * Runs action on each element found during a traversal starting at p. 984 * If p is null, traversal starts at head. 985 */ forEachFrom(Consumer<? super E> action, Node<E> p)986 void forEachFrom(Consumer<? super E> action, Node<E> p) { 987 // Extract batches of elements while holding the lock; then 988 // run the action on the elements while not 989 final int batchSize = 64; // max number of elements per batch 990 Object[] es = null; // container for batch of elements 991 int n, len = 0; 992 do { 993 fullyLock(); 994 try { 995 if (es == null) { 996 if (p == null) p = head.next; 997 for (Node<E> q = p; q != null; q = succ(q)) 998 if (q.item != null && ++len == batchSize) 999 break; 1000 es = new Object[len]; 1001 } 1002 for (n = 0; p != null && n < len; p = succ(p)) 1003 if ((es[n] = p.item) != null) 1004 n++; 1005 } finally { 1006 fullyUnlock(); 1007 } 1008 for (int i = 0; i < n; i++) { 1009 @SuppressWarnings("unchecked") E e = (E) es[i]; 1010 action.accept(e); 1011 } 1012 } while (n > 0 && p != null); 1013 } 1014 1015 /** 1016 * @throws NullPointerException {@inheritDoc} 1017 */ removeIf(Predicate<? super E> filter)1018 public boolean removeIf(Predicate<? super E> filter) { 1019 Objects.requireNonNull(filter); 1020 return bulkRemove(filter); 1021 } 1022 1023 /** 1024 * @throws NullPointerException {@inheritDoc} 1025 */ removeAll(Collection<?> c)1026 public boolean removeAll(Collection<?> c) { 1027 Objects.requireNonNull(c); 1028 return bulkRemove(e -> c.contains(e)); 1029 } 1030 1031 /** 1032 * @throws NullPointerException {@inheritDoc} 1033 */ retainAll(Collection<?> c)1034 public boolean retainAll(Collection<?> c) { 1035 Objects.requireNonNull(c); 1036 return bulkRemove(e -> !c.contains(e)); 1037 } 1038 1039 /** 1040 * Returns the predecessor of live node p, given a node that was 1041 * once a live ancestor of p (or head); allows unlinking of p. 1042 */ findPred(Node<E> p, Node<E> ancestor)1043 Node<E> findPred(Node<E> p, Node<E> ancestor) { 1044 // assert p.item != null; 1045 if (ancestor.item == null) 1046 ancestor = head; 1047 // Fails with NPE if precondition not satisfied 1048 for (Node<E> q; (q = ancestor.next) != p; ) 1049 ancestor = q; 1050 return ancestor; 1051 } 1052 1053 /** Implementation of bulk remove methods. */ 1054 @SuppressWarnings("unchecked") bulkRemove(Predicate<? super E> filter)1055 private boolean bulkRemove(Predicate<? super E> filter) { 1056 boolean removed = false; 1057 Node<E> p = null, ancestor = head; 1058 Node<E>[] nodes = null; 1059 int n, len = 0; 1060 do { 1061 // 1. Extract batch of up to 64 elements while holding the lock. 1062 fullyLock(); 1063 try { 1064 if (nodes == null) { // first batch; initialize 1065 p = head.next; 1066 for (Node<E> q = p; q != null; q = succ(q)) 1067 if (q.item != null && ++len == 64) 1068 break; 1069 nodes = (Node<E>[]) new Node<?>[len]; 1070 } 1071 for (n = 0; p != null && n < len; p = succ(p)) 1072 nodes[n++] = p; 1073 } finally { 1074 fullyUnlock(); 1075 } 1076 1077 // 2. Run the filter on the elements while lock is free. 1078 long deathRow = 0L; // "bitset" of size 64 1079 for (int i = 0; i < n; i++) { 1080 final E e; 1081 if ((e = nodes[i].item) != null && filter.test(e)) 1082 deathRow |= 1L << i; 1083 } 1084 1085 // 3. Remove any filtered elements while holding the lock. 1086 if (deathRow != 0) { 1087 fullyLock(); 1088 try { 1089 for (int i = 0; i < n; i++) { 1090 final Node<E> q; 1091 if ((deathRow & (1L << i)) != 0L 1092 && (q = nodes[i]).item != null) { 1093 ancestor = findPred(q, ancestor); 1094 unlink(q, ancestor); 1095 removed = true; 1096 } 1097 nodes[i] = null; // help GC 1098 } 1099 } finally { 1100 fullyUnlock(); 1101 } 1102 } 1103 } while (n > 0 && p != null); 1104 return removed; 1105 } 1106 1107 /** 1108 * Saves this queue to a stream (that is, serializes it). 1109 * 1110 * @param s the stream 1111 * @throws java.io.IOException if an I/O error occurs 1112 * @serialData The capacity is emitted (int), followed by all of 1113 * its elements (each an {@code Object}) in the proper order, 1114 * followed by a null 1115 */ writeObject(java.io.ObjectOutputStream s)1116 private void writeObject(java.io.ObjectOutputStream s) 1117 throws java.io.IOException { 1118 1119 fullyLock(); 1120 try { 1121 // Write out any hidden stuff, plus capacity 1122 s.defaultWriteObject(); 1123 1124 // Write out all elements in the proper order. 1125 for (Node<E> p = head.next; p != null; p = p.next) 1126 s.writeObject(p.item); 1127 1128 // Use trailing null as sentinel 1129 s.writeObject(null); 1130 } finally { 1131 fullyUnlock(); 1132 } 1133 } 1134 1135 /** 1136 * Reconstitutes this queue from a stream (that is, deserializes it). 1137 * @param s the stream 1138 * @throws ClassNotFoundException if the class of a serialized object 1139 * could not be found 1140 * @throws java.io.IOException if an I/O error occurs 1141 */ readObject(java.io.ObjectInputStream s)1142 private void readObject(java.io.ObjectInputStream s) 1143 throws java.io.IOException, ClassNotFoundException { 1144 // Read in capacity, and any hidden stuff 1145 s.defaultReadObject(); 1146 1147 count.set(0); 1148 last = head = new Node<E>(null); 1149 1150 // Read in all elements and place in queue 1151 for (;;) { 1152 @SuppressWarnings("unchecked") 1153 E item = (E)s.readObject(); 1154 if (item == null) 1155 break; 1156 add(item); 1157 } 1158 } 1159 } 1160