1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 9 import java.util.AbstractQueue; 10 import java.util.Collection; 11 import java.util.Iterator; 12 import java.util.NoSuchElementException; 13 import java.util.Spliterator; 14 import java.util.Spliterators; 15 import java.util.concurrent.atomic.AtomicInteger; 16 import java.util.concurrent.locks.Condition; 17 import java.util.concurrent.locks.ReentrantLock; 18 import java.util.function.Consumer; 19 20 // BEGIN android-note 21 // removed link to collections framework docs 22 // END android-note 23 24 /** 25 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on 26 * linked nodes. 27 * This queue orders elements FIFO (first-in-first-out). 28 * The <em>head</em> of the queue is that element that has been on the 29 * queue the longest time. 30 * The <em>tail</em> of the queue is that element that has been on the 31 * queue the shortest time. New elements 32 * are inserted at the tail of the queue, and the queue retrieval 33 * operations obtain elements at the head of the queue. 34 * Linked queues typically have higher throughput than array-based queues but 35 * less predictable performance in most concurrent applications. 36 * 37 * <p>The optional capacity bound constructor argument serves as a 38 * way to prevent excessive queue expansion. The capacity, if unspecified, 39 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are 40 * dynamically created upon each insertion unless this would bring the 41 * queue above capacity. 42 * 43 * <p>This class and its iterator implement all of the 44 * <em>optional</em> methods of the {@link Collection} and {@link 45 * Iterator} interfaces. 46 * 47 * @since 1.5 48 * @author Doug Lea 49 * @param <E> the type of elements held in this queue 50 */ 51 public class LinkedBlockingQueue<E> extends AbstractQueue<E> 52 implements BlockingQueue<E>, java.io.Serializable { 53 private static final long serialVersionUID = -6903933977591709194L; 54 55 /* 56 * A variant of the "two lock queue" algorithm. The putLock gates 57 * entry to put (and offer), and has an associated condition for 58 * waiting puts. Similarly for the takeLock. The "count" field 59 * that they both rely on is maintained as an atomic to avoid 60 * needing to get both locks in most cases. Also, to minimize need 61 * for puts to get takeLock and vice-versa, cascading notifies are 62 * used. When a put notices that it has enabled at least one take, 63 * it signals taker. That taker in turn signals others if more 64 * items have been entered since the signal. And symmetrically for 65 * takes signalling puts. Operations such as remove(Object) and 66 * iterators acquire both locks. 67 * 68 * Visibility between writers and readers is provided as follows: 69 * 70 * Whenever an element is enqueued, the putLock is acquired and 71 * count updated. A subsequent reader guarantees visibility to the 72 * enqueued Node by either acquiring the putLock (via fullyLock) 73 * or by acquiring the takeLock, and then reading n = count.get(); 74 * this gives visibility to the first n items. 75 * 76 * To implement weakly consistent iterators, it appears we need to 77 * keep all Nodes GC-reachable from a predecessor dequeued Node. 78 * That would cause two problems: 79 * - allow a rogue Iterator to cause unbounded memory retention 80 * - cause cross-generational linking of old Nodes to new Nodes if 81 * a Node was tenured while live, which generational GCs have a 82 * hard time dealing with, causing repeated major collections. 83 * However, only non-deleted Nodes need to be reachable from 84 * dequeued Nodes, and reachability does not necessarily have to 85 * be of the kind understood by the GC. We use the trick of 86 * linking a Node that has just been dequeued to itself. Such a 87 * self-link implicitly means to advance to head.next. 88 */ 89 90 /** 91 * Linked list node class. 92 */ 93 static class Node<E> { 94 E item; 95 96 /** 97 * One of: 98 * - the real successor Node 99 * - this Node, meaning the successor is head.next 100 * - null, meaning there is no successor (this is the last node) 101 */ 102 Node<E> next; 103 Node(E x)104 Node(E x) { item = x; } 105 } 106 107 /** The capacity bound, or Integer.MAX_VALUE if none */ 108 private final int capacity; 109 110 /** Current number of elements */ 111 private final AtomicInteger count = new AtomicInteger(); 112 113 /** 114 * Head of linked list. 115 * Invariant: head.item == null 116 */ 117 transient Node<E> head; 118 119 /** 120 * Tail of linked list. 121 * Invariant: last.next == null 122 */ 123 private transient Node<E> last; 124 125 /** Lock held by take, poll, etc */ 126 private final ReentrantLock takeLock = new ReentrantLock(); 127 128 /** Wait queue for waiting takes */ 129 private final Condition notEmpty = takeLock.newCondition(); 130 131 /** Lock held by put, offer, etc */ 132 private final ReentrantLock putLock = new ReentrantLock(); 133 134 /** Wait queue for waiting puts */ 135 private final Condition notFull = putLock.newCondition(); 136 137 /** 138 * Signals a waiting take. Called only from put/offer (which do not 139 * otherwise ordinarily lock takeLock.) 140 */ signalNotEmpty()141 private void signalNotEmpty() { 142 final ReentrantLock takeLock = this.takeLock; 143 takeLock.lock(); 144 try { 145 notEmpty.signal(); 146 } finally { 147 takeLock.unlock(); 148 } 149 } 150 151 /** 152 * Signals a waiting put. Called only from take/poll. 153 */ signalNotFull()154 private void signalNotFull() { 155 final ReentrantLock putLock = this.putLock; 156 putLock.lock(); 157 try { 158 notFull.signal(); 159 } finally { 160 putLock.unlock(); 161 } 162 } 163 164 /** 165 * Links node at end of queue. 166 * 167 * @param node the node 168 */ enqueue(Node<E> node)169 private void enqueue(Node<E> node) { 170 // assert putLock.isHeldByCurrentThread(); 171 // assert last.next == null; 172 last = last.next = node; 173 } 174 175 /** 176 * Removes a node from head of queue. 177 * 178 * @return the node 179 */ dequeue()180 private E dequeue() { 181 // assert takeLock.isHeldByCurrentThread(); 182 // assert head.item == null; 183 Node<E> h = head; 184 Node<E> first = h.next; 185 h.next = h; // help GC 186 head = first; 187 E x = first.item; 188 first.item = null; 189 return x; 190 } 191 192 /** 193 * Locks to prevent both puts and takes. 194 */ fullyLock()195 void fullyLock() { 196 putLock.lock(); 197 takeLock.lock(); 198 } 199 200 /** 201 * Unlocks to allow both puts and takes. 202 */ fullyUnlock()203 void fullyUnlock() { 204 takeLock.unlock(); 205 putLock.unlock(); 206 } 207 208 // /** 209 // * Tells whether both locks are held by current thread. 210 // */ 211 // boolean isFullyLocked() { 212 // return (putLock.isHeldByCurrentThread() && 213 // takeLock.isHeldByCurrentThread()); 214 // } 215 216 /** 217 * Creates a {@code LinkedBlockingQueue} with a capacity of 218 * {@link Integer#MAX_VALUE}. 219 */ LinkedBlockingQueue()220 public LinkedBlockingQueue() { 221 this(Integer.MAX_VALUE); 222 } 223 224 /** 225 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. 226 * 227 * @param capacity the capacity of this queue 228 * @throws IllegalArgumentException if {@code capacity} is not greater 229 * than zero 230 */ LinkedBlockingQueue(int capacity)231 public LinkedBlockingQueue(int capacity) { 232 if (capacity <= 0) throw new IllegalArgumentException(); 233 this.capacity = capacity; 234 last = head = new Node<E>(null); 235 } 236 237 /** 238 * Creates a {@code LinkedBlockingQueue} with a capacity of 239 * {@link Integer#MAX_VALUE}, initially containing the elements of the 240 * given collection, 241 * added in traversal order of the collection's iterator. 242 * 243 * @param c the collection of elements to initially contain 244 * @throws NullPointerException if the specified collection or any 245 * of its elements are null 246 */ LinkedBlockingQueue(Collection<? extends E> c)247 public LinkedBlockingQueue(Collection<? extends E> c) { 248 this(Integer.MAX_VALUE); 249 final ReentrantLock putLock = this.putLock; 250 putLock.lock(); // Never contended, but necessary for visibility 251 try { 252 int n = 0; 253 for (E e : c) { 254 if (e == null) 255 throw new NullPointerException(); 256 if (n == capacity) 257 throw new IllegalStateException("Queue full"); 258 enqueue(new Node<E>(e)); 259 ++n; 260 } 261 count.set(n); 262 } finally { 263 putLock.unlock(); 264 } 265 } 266 267 // this doc comment is overridden to remove the reference to collections 268 // greater in size than Integer.MAX_VALUE 269 /** 270 * Returns the number of elements in this queue. 271 * 272 * @return the number of elements in this queue 273 */ size()274 public int size() { 275 return count.get(); 276 } 277 278 // this doc comment is a modified copy of the inherited doc comment, 279 // without the reference to unlimited queues. 280 /** 281 * Returns the number of additional elements that this queue can ideally 282 * (in the absence of memory or resource constraints) accept without 283 * blocking. This is always equal to the initial capacity of this queue 284 * less the current {@code size} of this queue. 285 * 286 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 287 * an element will succeed by inspecting {@code remainingCapacity} 288 * because it may be the case that another thread is about to 289 * insert or remove an element. 290 */ remainingCapacity()291 public int remainingCapacity() { 292 return capacity - count.get(); 293 } 294 295 /** 296 * Inserts the specified element at the tail of this queue, waiting if 297 * necessary for space to become available. 298 * 299 * @throws InterruptedException {@inheritDoc} 300 * @throws NullPointerException {@inheritDoc} 301 */ put(E e)302 public void put(E e) throws InterruptedException { 303 if (e == null) throw new NullPointerException(); 304 // Note: convention in all put/take/etc is to preset local var 305 // holding count negative to indicate failure unless set. 306 int c = -1; 307 Node<E> node = new Node<E>(e); 308 final ReentrantLock putLock = this.putLock; 309 final AtomicInteger count = this.count; 310 putLock.lockInterruptibly(); 311 try { 312 /* 313 * Note that count is used in wait guard even though it is 314 * not protected by lock. This works because count can 315 * only decrease at this point (all other puts are shut 316 * out by lock), and we (or some other waiting put) are 317 * signalled if it ever changes from capacity. Similarly 318 * for all other uses of count in other wait guards. 319 */ 320 while (count.get() == capacity) { 321 notFull.await(); 322 } 323 enqueue(node); 324 c = count.getAndIncrement(); 325 if (c + 1 < capacity) 326 notFull.signal(); 327 } finally { 328 putLock.unlock(); 329 } 330 if (c == 0) 331 signalNotEmpty(); 332 } 333 334 /** 335 * Inserts the specified element at the tail of this queue, waiting if 336 * necessary up to the specified wait time for space to become available. 337 * 338 * @return {@code true} if successful, or {@code false} if 339 * the specified waiting time elapses before space is available 340 * @throws InterruptedException {@inheritDoc} 341 * @throws NullPointerException {@inheritDoc} 342 */ offer(E e, long timeout, TimeUnit unit)343 public boolean offer(E e, long timeout, TimeUnit unit) 344 throws InterruptedException { 345 346 if (e == null) throw new NullPointerException(); 347 long nanos = unit.toNanos(timeout); 348 int c = -1; 349 final ReentrantLock putLock = this.putLock; 350 final AtomicInteger count = this.count; 351 putLock.lockInterruptibly(); 352 try { 353 while (count.get() == capacity) { 354 if (nanos <= 0L) 355 return false; 356 nanos = notFull.awaitNanos(nanos); 357 } 358 enqueue(new Node<E>(e)); 359 c = count.getAndIncrement(); 360 if (c + 1 < capacity) 361 notFull.signal(); 362 } finally { 363 putLock.unlock(); 364 } 365 if (c == 0) 366 signalNotEmpty(); 367 return true; 368 } 369 370 /** 371 * Inserts the specified element at the tail of this queue if it is 372 * possible to do so immediately without exceeding the queue's capacity, 373 * returning {@code true} upon success and {@code false} if this queue 374 * is full. 375 * When using a capacity-restricted queue, this method is generally 376 * preferable to method {@link BlockingQueue#add add}, which can fail to 377 * insert an element only by throwing an exception. 378 * 379 * @throws NullPointerException if the specified element is null 380 */ offer(E e)381 public boolean offer(E e) { 382 if (e == null) throw new NullPointerException(); 383 final AtomicInteger count = this.count; 384 if (count.get() == capacity) 385 return false; 386 int c = -1; 387 Node<E> node = new Node<E>(e); 388 final ReentrantLock putLock = this.putLock; 389 putLock.lock(); 390 try { 391 if (count.get() < capacity) { 392 enqueue(node); 393 c = count.getAndIncrement(); 394 if (c + 1 < capacity) 395 notFull.signal(); 396 } 397 } finally { 398 putLock.unlock(); 399 } 400 if (c == 0) 401 signalNotEmpty(); 402 return c >= 0; 403 } 404 take()405 public E take() throws InterruptedException { 406 E x; 407 int c = -1; 408 final AtomicInteger count = this.count; 409 final ReentrantLock takeLock = this.takeLock; 410 takeLock.lockInterruptibly(); 411 try { 412 while (count.get() == 0) { 413 notEmpty.await(); 414 } 415 x = dequeue(); 416 c = count.getAndDecrement(); 417 if (c > 1) 418 notEmpty.signal(); 419 } finally { 420 takeLock.unlock(); 421 } 422 if (c == capacity) 423 signalNotFull(); 424 return x; 425 } 426 poll(long timeout, TimeUnit unit)427 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 428 E x = null; 429 int c = -1; 430 long nanos = unit.toNanos(timeout); 431 final AtomicInteger count = this.count; 432 final ReentrantLock takeLock = this.takeLock; 433 takeLock.lockInterruptibly(); 434 try { 435 while (count.get() == 0) { 436 if (nanos <= 0L) 437 return null; 438 nanos = notEmpty.awaitNanos(nanos); 439 } 440 x = dequeue(); 441 c = count.getAndDecrement(); 442 if (c > 1) 443 notEmpty.signal(); 444 } finally { 445 takeLock.unlock(); 446 } 447 if (c == capacity) 448 signalNotFull(); 449 return x; 450 } 451 poll()452 public E poll() { 453 final AtomicInteger count = this.count; 454 if (count.get() == 0) 455 return null; 456 E x = null; 457 int c = -1; 458 final ReentrantLock takeLock = this.takeLock; 459 takeLock.lock(); 460 try { 461 if (count.get() > 0) { 462 x = dequeue(); 463 c = count.getAndDecrement(); 464 if (c > 1) 465 notEmpty.signal(); 466 } 467 } finally { 468 takeLock.unlock(); 469 } 470 if (c == capacity) 471 signalNotFull(); 472 return x; 473 } 474 peek()475 public E peek() { 476 if (count.get() == 0) 477 return null; 478 final ReentrantLock takeLock = this.takeLock; 479 takeLock.lock(); 480 try { 481 return (count.get() > 0) ? head.next.item : null; 482 } finally { 483 takeLock.unlock(); 484 } 485 } 486 487 /** 488 * Unlinks interior Node p with predecessor trail. 489 */ unlink(Node<E> p, Node<E> trail)490 void unlink(Node<E> p, Node<E> trail) { 491 // assert isFullyLocked(); 492 // p.next is not changed, to allow iterators that are 493 // traversing p to maintain their weak-consistency guarantee. 494 p.item = null; 495 trail.next = p.next; 496 if (last == p) 497 last = trail; 498 if (count.getAndDecrement() == capacity) 499 notFull.signal(); 500 } 501 502 /** 503 * Removes a single instance of the specified element from this queue, 504 * if it is present. More formally, removes an element {@code e} such 505 * that {@code o.equals(e)}, if this queue contains one or more such 506 * elements. 507 * Returns {@code true} if this queue contained the specified element 508 * (or equivalently, if this queue changed as a result of the call). 509 * 510 * @param o element to be removed from this queue, if present 511 * @return {@code true} if this queue changed as a result of the call 512 */ remove(Object o)513 public boolean remove(Object o) { 514 if (o == null) return false; 515 fullyLock(); 516 try { 517 for (Node<E> trail = head, p = trail.next; 518 p != null; 519 trail = p, p = p.next) { 520 if (o.equals(p.item)) { 521 unlink(p, trail); 522 return true; 523 } 524 } 525 return false; 526 } finally { 527 fullyUnlock(); 528 } 529 } 530 531 /** 532 * Returns {@code true} if this queue contains the specified element. 533 * More formally, returns {@code true} if and only if this queue contains 534 * at least one element {@code e} such that {@code o.equals(e)}. 535 * 536 * @param o object to be checked for containment in this queue 537 * @return {@code true} if this queue contains the specified element 538 */ contains(Object o)539 public boolean contains(Object o) { 540 if (o == null) return false; 541 fullyLock(); 542 try { 543 for (Node<E> p = head.next; p != null; p = p.next) 544 if (o.equals(p.item)) 545 return true; 546 return false; 547 } finally { 548 fullyUnlock(); 549 } 550 } 551 552 /** 553 * Returns an array containing all of the elements in this queue, in 554 * proper sequence. 555 * 556 * <p>The returned array will be "safe" in that no references to it are 557 * maintained by this queue. (In other words, this method must allocate 558 * a new array). The caller is thus free to modify the returned array. 559 * 560 * <p>This method acts as bridge between array-based and collection-based 561 * APIs. 562 * 563 * @return an array containing all of the elements in this queue 564 */ toArray()565 public Object[] toArray() { 566 fullyLock(); 567 try { 568 int size = count.get(); 569 Object[] a = new Object[size]; 570 int k = 0; 571 for (Node<E> p = head.next; p != null; p = p.next) 572 a[k++] = p.item; 573 return a; 574 } finally { 575 fullyUnlock(); 576 } 577 } 578 579 /** 580 * Returns an array containing all of the elements in this queue, in 581 * proper sequence; the runtime type of the returned array is that of 582 * the specified array. If the queue fits in the specified array, it 583 * is returned therein. Otherwise, a new array is allocated with the 584 * runtime type of the specified array and the size of this queue. 585 * 586 * <p>If this queue fits in the specified array with room to spare 587 * (i.e., the array has more elements than this queue), the element in 588 * the array immediately following the end of the queue is set to 589 * {@code null}. 590 * 591 * <p>Like the {@link #toArray()} method, this method acts as bridge between 592 * array-based and collection-based APIs. Further, this method allows 593 * precise control over the runtime type of the output array, and may, 594 * under certain circumstances, be used to save allocation costs. 595 * 596 * <p>Suppose {@code x} is a queue known to contain only strings. 597 * The following code can be used to dump the queue into a newly 598 * allocated array of {@code String}: 599 * 600 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 601 * 602 * Note that {@code toArray(new Object[0])} is identical in function to 603 * {@code toArray()}. 604 * 605 * @param a the array into which the elements of the queue are to 606 * be stored, if it is big enough; otherwise, a new array of the 607 * same runtime type is allocated for this purpose 608 * @return an array containing all of the elements in this queue 609 * @throws ArrayStoreException if the runtime type of the specified array 610 * is not a supertype of the runtime type of every element in 611 * this queue 612 * @throws NullPointerException if the specified array is null 613 */ 614 @SuppressWarnings("unchecked") toArray(T[] a)615 public <T> T[] toArray(T[] a) { 616 fullyLock(); 617 try { 618 int size = count.get(); 619 if (a.length < size) 620 a = (T[])java.lang.reflect.Array.newInstance 621 (a.getClass().getComponentType(), size); 622 623 int k = 0; 624 for (Node<E> p = head.next; p != null; p = p.next) 625 a[k++] = (T)p.item; 626 if (a.length > k) 627 a[k] = null; 628 return a; 629 } finally { 630 fullyUnlock(); 631 } 632 } 633 toString()634 public String toString() { 635 return Helpers.collectionToString(this); 636 } 637 638 /** 639 * Atomically removes all of the elements from this queue. 640 * The queue will be empty after this call returns. 641 */ clear()642 public void clear() { 643 fullyLock(); 644 try { 645 for (Node<E> p, h = head; (p = h.next) != null; h = p) { 646 h.next = h; 647 p.item = null; 648 } 649 head = last; 650 // assert head.item == null && head.next == null; 651 if (count.getAndSet(0) == capacity) 652 notFull.signal(); 653 } finally { 654 fullyUnlock(); 655 } 656 } 657 658 /** 659 * @throws UnsupportedOperationException {@inheritDoc} 660 * @throws ClassCastException {@inheritDoc} 661 * @throws NullPointerException {@inheritDoc} 662 * @throws IllegalArgumentException {@inheritDoc} 663 */ drainTo(Collection<? super E> c)664 public int drainTo(Collection<? super E> c) { 665 return drainTo(c, Integer.MAX_VALUE); 666 } 667 668 /** 669 * @throws UnsupportedOperationException {@inheritDoc} 670 * @throws ClassCastException {@inheritDoc} 671 * @throws NullPointerException {@inheritDoc} 672 * @throws IllegalArgumentException {@inheritDoc} 673 */ drainTo(Collection<? super E> c, int maxElements)674 public int drainTo(Collection<? super E> c, int maxElements) { 675 if (c == null) 676 throw new NullPointerException(); 677 if (c == this) 678 throw new IllegalArgumentException(); 679 if (maxElements <= 0) 680 return 0; 681 boolean signalNotFull = false; 682 final ReentrantLock takeLock = this.takeLock; 683 takeLock.lock(); 684 try { 685 int n = Math.min(maxElements, count.get()); 686 // count.get provides visibility to first n Nodes 687 Node<E> h = head; 688 int i = 0; 689 try { 690 while (i < n) { 691 Node<E> p = h.next; 692 c.add(p.item); 693 p.item = null; 694 h.next = h; 695 h = p; 696 ++i; 697 } 698 return n; 699 } finally { 700 // Restore invariants even if c.add() threw 701 if (i > 0) { 702 // assert h.item == null; 703 head = h; 704 signalNotFull = (count.getAndAdd(-i) == capacity); 705 } 706 } 707 } finally { 708 takeLock.unlock(); 709 if (signalNotFull) 710 signalNotFull(); 711 } 712 } 713 714 /** 715 * Returns an iterator over the elements in this queue in proper sequence. 716 * The elements will be returned in order from first (head) to last (tail). 717 * 718 * <p>The returned iterator is 719 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 720 * 721 * @return an iterator over the elements in this queue in proper sequence 722 */ iterator()723 public Iterator<E> iterator() { 724 return new Itr(); 725 } 726 727 private class Itr implements Iterator<E> { 728 /* 729 * Basic weakly-consistent iterator. At all times hold the next 730 * item to hand out so that if hasNext() reports true, we will 731 * still have it to return even if lost race with a take etc. 732 */ 733 734 private Node<E> current; 735 private Node<E> lastRet; 736 private E currentElement; 737 Itr()738 Itr() { 739 fullyLock(); 740 try { 741 current = head.next; 742 if (current != null) 743 currentElement = current.item; 744 } finally { 745 fullyUnlock(); 746 } 747 } 748 hasNext()749 public boolean hasNext() { 750 return current != null; 751 } 752 next()753 public E next() { 754 fullyLock(); 755 try { 756 if (current == null) 757 throw new NoSuchElementException(); 758 lastRet = current; 759 E item = null; 760 // Unlike other traversal methods, iterators must handle both: 761 // - dequeued nodes (p.next == p) 762 // - (possibly multiple) interior removed nodes (p.item == null) 763 for (Node<E> p = current, q;; p = q) { 764 if ((q = p.next) == p) 765 q = head.next; 766 if (q == null || (item = q.item) != null) { 767 current = q; 768 E x = currentElement; 769 currentElement = item; 770 return x; 771 } 772 } 773 } finally { 774 fullyUnlock(); 775 } 776 } 777 remove()778 public void remove() { 779 if (lastRet == null) 780 throw new IllegalStateException(); 781 fullyLock(); 782 try { 783 Node<E> node = lastRet; 784 lastRet = null; 785 for (Node<E> trail = head, p = trail.next; 786 p != null; 787 trail = p, p = p.next) { 788 if (p == node) { 789 unlink(p, trail); 790 break; 791 } 792 } 793 } finally { 794 fullyUnlock(); 795 } 796 } 797 } 798 799 /** A customized variant of Spliterators.IteratorSpliterator */ 800 static final class LBQSpliterator<E> implements Spliterator<E> { 801 static final int MAX_BATCH = 1 << 25; // max batch array size; 802 final LinkedBlockingQueue<E> queue; 803 Node<E> current; // current node; null until initialized 804 int batch; // batch size for splits 805 boolean exhausted; // true when no more nodes 806 long est; // size estimate LBQSpliterator(LinkedBlockingQueue<E> queue)807 LBQSpliterator(LinkedBlockingQueue<E> queue) { 808 this.queue = queue; 809 this.est = queue.size(); 810 } 811 estimateSize()812 public long estimateSize() { return est; } 813 trySplit()814 public Spliterator<E> trySplit() { 815 Node<E> h; 816 final LinkedBlockingQueue<E> q = this.queue; 817 int b = batch; 818 int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; 819 if (!exhausted && 820 ((h = current) != null || (h = q.head.next) != null) && 821 h.next != null) { 822 Object[] a = new Object[n]; 823 int i = 0; 824 Node<E> p = current; 825 q.fullyLock(); 826 try { 827 if (p != null || (p = q.head.next) != null) { 828 do { 829 if ((a[i] = p.item) != null) 830 ++i; 831 } while ((p = p.next) != null && i < n); 832 } 833 } finally { 834 q.fullyUnlock(); 835 } 836 if ((current = p) == null) { 837 est = 0L; 838 exhausted = true; 839 } 840 else if ((est -= i) < 0L) 841 est = 0L; 842 if (i > 0) { 843 batch = i; 844 return Spliterators.spliterator 845 (a, 0, i, (Spliterator.ORDERED | 846 Spliterator.NONNULL | 847 Spliterator.CONCURRENT)); 848 } 849 } 850 return null; 851 } 852 forEachRemaining(Consumer<? super E> action)853 public void forEachRemaining(Consumer<? super E> action) { 854 if (action == null) throw new NullPointerException(); 855 final LinkedBlockingQueue<E> q = this.queue; 856 if (!exhausted) { 857 exhausted = true; 858 Node<E> p = current; 859 do { 860 E e = null; 861 q.fullyLock(); 862 try { 863 if (p == null) 864 p = q.head.next; 865 while (p != null) { 866 e = p.item; 867 p = p.next; 868 if (e != null) 869 break; 870 } 871 } finally { 872 q.fullyUnlock(); 873 } 874 if (e != null) 875 action.accept(e); 876 } while (p != null); 877 } 878 } 879 tryAdvance(Consumer<? super E> action)880 public boolean tryAdvance(Consumer<? super E> action) { 881 if (action == null) throw new NullPointerException(); 882 final LinkedBlockingQueue<E> q = this.queue; 883 if (!exhausted) { 884 E e = null; 885 q.fullyLock(); 886 try { 887 if (current == null) 888 current = q.head.next; 889 while (current != null) { 890 e = current.item; 891 current = current.next; 892 if (e != null) 893 break; 894 } 895 } finally { 896 q.fullyUnlock(); 897 } 898 if (current == null) 899 exhausted = true; 900 if (e != null) { 901 action.accept(e); 902 return true; 903 } 904 } 905 return false; 906 } 907 characteristics()908 public int characteristics() { 909 return Spliterator.ORDERED | Spliterator.NONNULL | 910 Spliterator.CONCURRENT; 911 } 912 } 913 914 /** 915 * Returns a {@link Spliterator} over the elements in this queue. 916 * 917 * <p>The returned spliterator is 918 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 919 * 920 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, 921 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. 922 * 923 * @implNote 924 * The {@code Spliterator} implements {@code trySplit} to permit limited 925 * parallelism. 926 * 927 * @return a {@code Spliterator} over the elements in this queue 928 * @since 1.8 929 */ spliterator()930 public Spliterator<E> spliterator() { 931 return new LBQSpliterator<E>(this); 932 } 933 934 /** 935 * Saves this queue to a stream (that is, serializes it). 936 * 937 * @param s the stream 938 * @throws java.io.IOException if an I/O error occurs 939 * @serialData The capacity is emitted (int), followed by all of 940 * its elements (each an {@code Object}) in the proper order, 941 * followed by a null 942 */ writeObject(java.io.ObjectOutputStream s)943 private void writeObject(java.io.ObjectOutputStream s) 944 throws java.io.IOException { 945 946 fullyLock(); 947 try { 948 // Write out any hidden stuff, plus capacity 949 s.defaultWriteObject(); 950 951 // Write out all elements in the proper order. 952 for (Node<E> p = head.next; p != null; p = p.next) 953 s.writeObject(p.item); 954 955 // Use trailing null as sentinel 956 s.writeObject(null); 957 } finally { 958 fullyUnlock(); 959 } 960 } 961 962 /** 963 * Reconstitutes this queue from a stream (that is, deserializes it). 964 * @param s the stream 965 * @throws ClassNotFoundException if the class of a serialized object 966 * could not be found 967 * @throws java.io.IOException if an I/O error occurs 968 */ readObject(java.io.ObjectInputStream s)969 private void readObject(java.io.ObjectInputStream s) 970 throws java.io.IOException, ClassNotFoundException { 971 // Read in capacity, and any hidden stuff 972 s.defaultReadObject(); 973 974 count.set(0); 975 last = head = new Node<E>(null); 976 977 // Read in all elements and place in queue 978 for (;;) { 979 @SuppressWarnings("unchecked") 980 E item = (E)s.readObject(); 981 if (item == null) 982 break; 983 add(item); 984 } 985 } 986 } 987