1 /* 2 * Written by Doug Lea and Martin Buchholz with assistance from members of 3 * JCP JSR-166 Expert Group and released to the public domain, as explained 4 * at http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 9 import java.util.AbstractQueue; 10 import java.util.Arrays; 11 import java.util.Collection; 12 import java.util.Iterator; 13 import java.util.NoSuchElementException; 14 import java.util.Objects; 15 import java.util.Queue; 16 import java.util.Spliterator; 17 import java.util.Spliterators; 18 import java.util.function.Consumer; 19 20 // BEGIN android-note 21 // removed link to collections framework docs 22 // END android-note 23 24 /** 25 * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes. 26 * This queue orders elements FIFO (first-in-first-out). 27 * The <em>head</em> of the queue is that element that has been on the 28 * queue the longest time. 29 * The <em>tail</em> of the queue is that element that has been on the 30 * queue the shortest time. New elements 31 * are inserted at the tail of the queue, and the queue retrieval 32 * operations obtain elements at the head of the queue. 33 * A {@code ConcurrentLinkedQueue} is an appropriate choice when 34 * many threads will share access to a common collection. 35 * Like most other concurrent collection implementations, this class 36 * does not permit the use of {@code null} elements. 37 * 38 * <p>This implementation employs an efficient <em>non-blocking</em> 39 * algorithm based on one described in 40 * <a href="http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf"> 41 * Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue 42 * Algorithms</a> by Maged M. Michael and Michael L. Scott. 43 * 44 * <p>Iterators are <i>weakly consistent</i>, returning elements 45 * reflecting the state of the queue at some point at or since the 46 * creation of the iterator. They do <em>not</em> throw {@link 47 * java.util.ConcurrentModificationException}, and may proceed concurrently 48 * with other operations. Elements contained in the queue since the creation 49 * of the iterator will be returned exactly once. 50 * 51 * <p>Beware that, unlike in most collections, the {@code size} method 52 * is <em>NOT</em> a constant-time operation. Because of the 53 * asynchronous nature of these queues, determining the current number 54 * of elements requires a traversal of the elements, and so may report 55 * inaccurate results if this collection is modified during traversal. 56 * Additionally, the bulk operations {@code addAll}, 57 * {@code removeAll}, {@code retainAll}, {@code containsAll}, 58 * {@code equals}, and {@code toArray} are <em>not</em> guaranteed 59 * to be performed atomically. For example, an iterator operating 60 * concurrently with an {@code addAll} operation might view only some 61 * of the added elements. 62 * 63 * <p>This class and its iterator implement all of the <em>optional</em> 64 * methods of the {@link Queue} and {@link Iterator} interfaces. 65 * 66 * <p>Memory consistency effects: As with other concurrent 67 * collections, actions in a thread prior to placing an object into a 68 * {@code ConcurrentLinkedQueue} 69 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 70 * actions subsequent to the access or removal of that element from 71 * the {@code ConcurrentLinkedQueue} in another thread. 72 * 73 * @since 1.5 74 * @author Doug Lea 75 * @param <E> the type of elements held in this queue 76 */ 77 public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> 78 implements Queue<E>, java.io.Serializable { 79 private static final long serialVersionUID = 196745693267521676L; 80 81 /* 82 * This is a modification of the Michael & Scott algorithm, 83 * adapted for a garbage-collected environment, with support for 84 * interior node deletion (to support remove(Object)). For 85 * explanation, read the paper. 86 * 87 * Note that like most non-blocking algorithms in this package, 88 * this implementation relies on the fact that in garbage 89 * collected systems, there is no possibility of ABA problems due 90 * to recycled nodes, so there is no need to use "counted 91 * pointers" or related techniques seen in versions used in 92 * non-GC'ed settings. 93 * 94 * The fundamental invariants are: 95 * - There is exactly one (last) Node with a null next reference, 96 * which is CASed when enqueueing. This last Node can be 97 * reached in O(1) time from tail, but tail is merely an 98 * optimization - it can always be reached in O(N) time from 99 * head as well. 100 * - The elements contained in the queue are the non-null items in 101 * Nodes that are reachable from head. CASing the item 102 * reference of a Node to null atomically removes it from the 103 * queue. Reachability of all elements from head must remain 104 * true even in the case of concurrent modifications that cause 105 * head to advance. A dequeued Node may remain in use 106 * indefinitely due to creation of an Iterator or simply a 107 * poll() that has lost its time slice. 108 * 109 * The above might appear to imply that all Nodes are GC-reachable 110 * from a predecessor dequeued Node. That would cause two problems: 111 * - allow a rogue Iterator to cause unbounded memory retention 112 * - cause cross-generational linking of old Nodes to new Nodes if 113 * a Node was tenured while live, which generational GCs have a 114 * hard time dealing with, causing repeated major collections. 115 * However, only non-deleted Nodes need to be reachable from 116 * dequeued Nodes, and reachability does not necessarily have to 117 * be of the kind understood by the GC. We use the trick of 118 * linking a Node that has just been dequeued to itself. Such a 119 * self-link implicitly means to advance to head. 120 * 121 * Both head and tail are permitted to lag. In fact, failing to 122 * update them every time one could is a significant optimization 123 * (fewer CASes). As with LinkedTransferQueue (see the internal 124 * documentation for that class), we use a slack threshold of two; 125 * that is, we update head/tail when the current pointer appears 126 * to be two or more steps away from the first/last node. 127 * 128 * Since head and tail are updated concurrently and independently, 129 * it is possible for tail to lag behind head (why not)? 130 * 131 * CASing a Node's item reference to null atomically removes the 132 * element from the queue. Iterators skip over Nodes with null 133 * items. Prior implementations of this class had a race between 134 * poll() and remove(Object) where the same element would appear 135 * to be successfully removed by two concurrent operations. The 136 * method remove(Object) also lazily unlinks deleted Nodes, but 137 * this is merely an optimization. 138 * 139 * When constructing a Node (before enqueuing it) we avoid paying 140 * for a volatile write to item by using Unsafe.putObject instead 141 * of a normal write. This allows the cost of enqueue to be 142 * "one-and-a-half" CASes. 143 * 144 * Both head and tail may or may not point to a Node with a 145 * non-null item. If the queue is empty, all items must of course 146 * be null. Upon creation, both head and tail refer to a dummy 147 * Node with null item. Both head and tail are only updated using 148 * CAS, so they never regress, although again this is merely an 149 * optimization. 150 */ 151 152 private static class Node<E> { 153 volatile E item; 154 volatile Node<E> next; 155 } 156 157 /** 158 * Returns a new node holding item. Uses relaxed write because item 159 * can only be seen after piggy-backing publication via casNext. 160 */ newNode(E item)161 static <E> Node<E> newNode(E item) { 162 Node<E> node = new Node<E>(); 163 U.putObject(node, ITEM, item); 164 return node; 165 } 166 casItem(Node<E> node, E cmp, E val)167 static <E> boolean casItem(Node<E> node, E cmp, E val) { 168 return U.compareAndSwapObject(node, ITEM, cmp, val); 169 } 170 lazySetNext(Node<E> node, Node<E> val)171 static <E> void lazySetNext(Node<E> node, Node<E> val) { 172 U.putOrderedObject(node, NEXT, val); 173 } 174 casNext(Node<E> node, Node<E> cmp, Node<E> val)175 static <E> boolean casNext(Node<E> node, Node<E> cmp, Node<E> val) { 176 return U.compareAndSwapObject(node, NEXT, cmp, val); 177 } 178 179 /** 180 * A node from which the first live (non-deleted) node (if any) 181 * can be reached in O(1) time. 182 * Invariants: 183 * - all live nodes are reachable from head via succ() 184 * - head != null 185 * - (tmp = head).next != tmp || tmp != head 186 * Non-invariants: 187 * - head.item may or may not be null. 188 * - it is permitted for tail to lag behind head, that is, for tail 189 * to not be reachable from head! 190 */ 191 transient volatile Node<E> head; 192 193 /** 194 * A node from which the last node on list (that is, the unique 195 * node with node.next == null) can be reached in O(1) time. 196 * Invariants: 197 * - the last node is always reachable from tail via succ() 198 * - tail != null 199 * Non-invariants: 200 * - tail.item may or may not be null. 201 * - it is permitted for tail to lag behind head, that is, for tail 202 * to not be reachable from head! 203 * - tail.next may or may not be self-pointing to tail. 204 */ 205 private transient volatile Node<E> tail; 206 207 /** 208 * Creates a {@code ConcurrentLinkedQueue} that is initially empty. 209 */ ConcurrentLinkedQueue()210 public ConcurrentLinkedQueue() { 211 head = tail = newNode(null); 212 } 213 214 /** 215 * Creates a {@code ConcurrentLinkedQueue} 216 * initially containing the elements of the given collection, 217 * added in traversal order of the collection's iterator. 218 * 219 * @param c the collection of elements to initially contain 220 * @throws NullPointerException if the specified collection or any 221 * of its elements are null 222 */ ConcurrentLinkedQueue(Collection<? extends E> c)223 public ConcurrentLinkedQueue(Collection<? extends E> c) { 224 Node<E> h = null, t = null; 225 for (E e : c) { 226 Node<E> newNode = newNode(Objects.requireNonNull(e)); 227 if (h == null) 228 h = t = newNode; 229 else { 230 lazySetNext(t, newNode); 231 t = newNode; 232 } 233 } 234 if (h == null) 235 h = t = newNode(null); 236 head = h; 237 tail = t; 238 } 239 240 // Have to override just to update the javadoc 241 242 /** 243 * Inserts the specified element at the tail of this queue. 244 * As the queue is unbounded, this method will never throw 245 * {@link IllegalStateException} or return {@code false}. 246 * 247 * @return {@code true} (as specified by {@link Collection#add}) 248 * @throws NullPointerException if the specified element is null 249 */ add(E e)250 public boolean add(E e) { 251 return offer(e); 252 } 253 254 /** 255 * Tries to CAS head to p. If successful, repoint old head to itself 256 * as sentinel for succ(), below. 257 */ updateHead(Node<E> h, Node<E> p)258 final void updateHead(Node<E> h, Node<E> p) { 259 // assert h != null && p != null && (h == p || h.item == null); 260 if (h != p && casHead(h, p)) 261 lazySetNext(h, h); 262 } 263 264 /** 265 * Returns the successor of p, or the head node if p.next has been 266 * linked to self, which will only be true if traversing with a 267 * stale pointer that is now off the list. 268 */ succ(Node<E> p)269 final Node<E> succ(Node<E> p) { 270 Node<E> next = p.next; 271 return (p == next) ? head : next; 272 } 273 274 /** 275 * Inserts the specified element at the tail of this queue. 276 * As the queue is unbounded, this method will never return {@code false}. 277 * 278 * @return {@code true} (as specified by {@link Queue#offer}) 279 * @throws NullPointerException if the specified element is null 280 */ offer(E e)281 public boolean offer(E e) { 282 final Node<E> newNode = newNode(Objects.requireNonNull(e)); 283 284 for (Node<E> t = tail, p = t;;) { 285 Node<E> q = p.next; 286 if (q == null) { 287 // p is last node 288 if (casNext(p, null, newNode)) { 289 // Successful CAS is the linearization point 290 // for e to become an element of this queue, 291 // and for newNode to become "live". 292 if (p != t) // hop two nodes at a time 293 casTail(t, newNode); // Failure is OK. 294 return true; 295 } 296 // Lost CAS race to another thread; re-read next 297 } 298 else if (p == q) 299 // We have fallen off list. If tail is unchanged, it 300 // will also be off-list, in which case we need to 301 // jump to head, from which all live nodes are always 302 // reachable. Else the new tail is a better bet. 303 p = (t != (t = tail)) ? t : head; 304 else 305 // Check for tail updates after two hops. 306 p = (p != t && t != (t = tail)) ? t : q; 307 } 308 } 309 poll()310 public E poll() { 311 restartFromHead: 312 for (;;) { 313 for (Node<E> h = head, p = h, q;;) { 314 E item = p.item; 315 316 if (item != null && casItem(p, item, null)) { 317 // Successful CAS is the linearization point 318 // for item to be removed from this queue. 319 if (p != h) // hop two nodes at a time 320 updateHead(h, ((q = p.next) != null) ? q : p); 321 return item; 322 } 323 else if ((q = p.next) == null) { 324 updateHead(h, p); 325 return null; 326 } 327 else if (p == q) 328 continue restartFromHead; 329 else 330 p = q; 331 } 332 } 333 } 334 peek()335 public E peek() { 336 restartFromHead: 337 for (;;) { 338 for (Node<E> h = head, p = h, q;;) { 339 E item = p.item; 340 if (item != null || (q = p.next) == null) { 341 updateHead(h, p); 342 return item; 343 } 344 else if (p == q) 345 continue restartFromHead; 346 else 347 p = q; 348 } 349 } 350 } 351 352 /** 353 * Returns the first live (non-deleted) node on list, or null if none. 354 * This is yet another variant of poll/peek; here returning the 355 * first node, not element. We could make peek() a wrapper around 356 * first(), but that would cost an extra volatile read of item, 357 * and the need to add a retry loop to deal with the possibility 358 * of losing a race to a concurrent poll(). 359 */ first()360 Node<E> first() { 361 restartFromHead: 362 for (;;) { 363 for (Node<E> h = head, p = h, q;;) { 364 boolean hasItem = (p.item != null); 365 if (hasItem || (q = p.next) == null) { 366 updateHead(h, p); 367 return hasItem ? p : null; 368 } 369 else if (p == q) 370 continue restartFromHead; 371 else 372 p = q; 373 } 374 } 375 } 376 377 /** 378 * Returns {@code true} if this queue contains no elements. 379 * 380 * @return {@code true} if this queue contains no elements 381 */ isEmpty()382 public boolean isEmpty() { 383 return first() == null; 384 } 385 386 /** 387 * Returns the number of elements in this queue. If this queue 388 * contains more than {@code Integer.MAX_VALUE} elements, returns 389 * {@code Integer.MAX_VALUE}. 390 * 391 * <p>Beware that, unlike in most collections, this method is 392 * <em>NOT</em> a constant-time operation. Because of the 393 * asynchronous nature of these queues, determining the current 394 * number of elements requires an O(n) traversal. 395 * Additionally, if elements are added or removed during execution 396 * of this method, the returned result may be inaccurate. Thus, 397 * this method is typically not very useful in concurrent 398 * applications. 399 * 400 * @return the number of elements in this queue 401 */ size()402 public int size() { 403 restartFromHead: for (;;) { 404 int count = 0; 405 for (Node<E> p = first(); p != null;) { 406 if (p.item != null) 407 if (++count == Integer.MAX_VALUE) 408 break; // @see Collection.size() 409 if (p == (p = p.next)) 410 continue restartFromHead; 411 } 412 return count; 413 } 414 } 415 416 /** 417 * Returns {@code true} if this queue contains the specified element. 418 * More formally, returns {@code true} if and only if this queue contains 419 * at least one element {@code e} such that {@code o.equals(e)}. 420 * 421 * @param o object to be checked for containment in this queue 422 * @return {@code true} if this queue contains the specified element 423 */ contains(Object o)424 public boolean contains(Object o) { 425 if (o != null) { 426 for (Node<E> p = first(); p != null; p = succ(p)) { 427 E item = p.item; 428 if (item != null && o.equals(item)) 429 return true; 430 } 431 } 432 return false; 433 } 434 435 /** 436 * Removes a single instance of the specified element from this queue, 437 * if it is present. More formally, removes an element {@code e} such 438 * that {@code o.equals(e)}, if this queue contains one or more such 439 * elements. 440 * Returns {@code true} if this queue contained the specified element 441 * (or equivalently, if this queue changed as a result of the call). 442 * 443 * @param o element to be removed from this queue, if present 444 * @return {@code true} if this queue changed as a result of the call 445 */ remove(Object o)446 public boolean remove(Object o) { 447 if (o != null) { 448 Node<E> next, pred = null; 449 for (Node<E> p = first(); p != null; pred = p, p = next) { 450 boolean removed = false; 451 E item = p.item; 452 if (item != null) { 453 if (!o.equals(item)) { 454 next = succ(p); 455 continue; 456 } 457 removed = casItem(p, item, null); 458 } 459 460 next = succ(p); 461 if (pred != null && next != null) // unlink 462 casNext(pred, p, next); 463 if (removed) 464 return true; 465 } 466 } 467 return false; 468 } 469 470 /** 471 * Appends all of the elements in the specified collection to the end of 472 * this queue, in the order that they are returned by the specified 473 * collection's iterator. Attempts to {@code addAll} of a queue to 474 * itself result in {@code IllegalArgumentException}. 475 * 476 * @param c the elements to be inserted into this queue 477 * @return {@code true} if this queue changed as a result of the call 478 * @throws NullPointerException if the specified collection or any 479 * of its elements are null 480 * @throws IllegalArgumentException if the collection is this queue 481 */ addAll(Collection<? extends E> c)482 public boolean addAll(Collection<? extends E> c) { 483 if (c == this) 484 // As historically specified in AbstractQueue#addAll 485 throw new IllegalArgumentException(); 486 487 // Copy c into a private chain of Nodes 488 Node<E> beginningOfTheEnd = null, last = null; 489 for (E e : c) { 490 Node<E> newNode = newNode(Objects.requireNonNull(e)); 491 if (beginningOfTheEnd == null) 492 beginningOfTheEnd = last = newNode; 493 else { 494 lazySetNext(last, newNode); 495 last = newNode; 496 } 497 } 498 if (beginningOfTheEnd == null) 499 return false; 500 501 // Atomically append the chain at the tail of this collection 502 for (Node<E> t = tail, p = t;;) { 503 Node<E> q = p.next; 504 if (q == null) { 505 // p is last node 506 if (casNext(p, null, beginningOfTheEnd)) { 507 // Successful CAS is the linearization point 508 // for all elements to be added to this queue. 509 if (!casTail(t, last)) { 510 // Try a little harder to update tail, 511 // since we may be adding many elements. 512 t = tail; 513 if (last.next == null) 514 casTail(t, last); 515 } 516 return true; 517 } 518 // Lost CAS race to another thread; re-read next 519 } 520 else if (p == q) 521 // We have fallen off list. If tail is unchanged, it 522 // will also be off-list, in which case we need to 523 // jump to head, from which all live nodes are always 524 // reachable. Else the new tail is a better bet. 525 p = (t != (t = tail)) ? t : head; 526 else 527 // Check for tail updates after two hops. 528 p = (p != t && t != (t = tail)) ? t : q; 529 } 530 } 531 toString()532 public String toString() { 533 String[] a = null; 534 restartFromHead: for (;;) { 535 int charLength = 0; 536 int size = 0; 537 for (Node<E> p = first(); p != null;) { 538 E item = p.item; 539 if (item != null) { 540 if (a == null) 541 a = new String[4]; 542 else if (size == a.length) 543 a = Arrays.copyOf(a, 2 * size); 544 String s = item.toString(); 545 a[size++] = s; 546 charLength += s.length(); 547 } 548 if (p == (p = p.next)) 549 continue restartFromHead; 550 } 551 552 if (size == 0) 553 return "[]"; 554 555 return Helpers.toString(a, size, charLength); 556 } 557 } 558 toArrayInternal(Object[] a)559 private Object[] toArrayInternal(Object[] a) { 560 Object[] x = a; 561 restartFromHead: for (;;) { 562 int size = 0; 563 for (Node<E> p = first(); p != null;) { 564 E item = p.item; 565 if (item != null) { 566 if (x == null) 567 x = new Object[4]; 568 else if (size == x.length) 569 x = Arrays.copyOf(x, 2 * (size + 4)); 570 x[size++] = item; 571 } 572 if (p == (p = p.next)) 573 continue restartFromHead; 574 } 575 if (x == null) 576 return new Object[0]; 577 else if (a != null && size <= a.length) { 578 if (a != x) 579 System.arraycopy(x, 0, a, 0, size); 580 if (size < a.length) 581 a[size] = null; 582 return a; 583 } 584 return (size == x.length) ? x : Arrays.copyOf(x, size); 585 } 586 } 587 588 /** 589 * Returns an array containing all of the elements in this queue, in 590 * proper sequence. 591 * 592 * <p>The returned array will be "safe" in that no references to it are 593 * maintained by this queue. (In other words, this method must allocate 594 * a new array). The caller is thus free to modify the returned array. 595 * 596 * <p>This method acts as bridge between array-based and collection-based 597 * APIs. 598 * 599 * @return an array containing all of the elements in this queue 600 */ toArray()601 public Object[] toArray() { 602 return toArrayInternal(null); 603 } 604 605 /** 606 * Returns an array containing all of the elements in this queue, in 607 * proper sequence; the runtime type of the returned array is that of 608 * the specified array. If the queue fits in the specified array, it 609 * is returned therein. Otherwise, a new array is allocated with the 610 * runtime type of the specified array and the size of this queue. 611 * 612 * <p>If this queue fits in the specified array with room to spare 613 * (i.e., the array has more elements than this queue), the element in 614 * the array immediately following the end of the queue is set to 615 * {@code null}. 616 * 617 * <p>Like the {@link #toArray()} method, this method acts as bridge between 618 * array-based and collection-based APIs. Further, this method allows 619 * precise control over the runtime type of the output array, and may, 620 * under certain circumstances, be used to save allocation costs. 621 * 622 * <p>Suppose {@code x} is a queue known to contain only strings. 623 * The following code can be used to dump the queue into a newly 624 * allocated array of {@code String}: 625 * 626 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 627 * 628 * Note that {@code toArray(new Object[0])} is identical in function to 629 * {@code toArray()}. 630 * 631 * @param a the array into which the elements of the queue are to 632 * be stored, if it is big enough; otherwise, a new array of the 633 * same runtime type is allocated for this purpose 634 * @return an array containing all of the elements in this queue 635 * @throws ArrayStoreException if the runtime type of the specified array 636 * is not a supertype of the runtime type of every element in 637 * this queue 638 * @throws NullPointerException if the specified array is null 639 */ 640 @SuppressWarnings("unchecked") toArray(T[] a)641 public <T> T[] toArray(T[] a) { 642 if (a == null) throw new NullPointerException(); 643 return (T[]) toArrayInternal(a); 644 } 645 646 /** 647 * Returns an iterator over the elements in this queue in proper sequence. 648 * The elements will be returned in order from first (head) to last (tail). 649 * 650 * <p>The returned iterator is 651 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 652 * 653 * @return an iterator over the elements in this queue in proper sequence 654 */ iterator()655 public Iterator<E> iterator() { 656 return new Itr(); 657 } 658 659 private class Itr implements Iterator<E> { 660 /** 661 * Next node to return item for. 662 */ 663 private Node<E> nextNode; 664 665 /** 666 * nextItem holds on to item fields because once we claim 667 * that an element exists in hasNext(), we must return it in 668 * the following next() call even if it was in the process of 669 * being removed when hasNext() was called. 670 */ 671 private E nextItem; 672 673 /** 674 * Node of the last returned item, to support remove. 675 */ 676 private Node<E> lastRet; 677 Itr()678 Itr() { 679 restartFromHead: for (;;) { 680 Node<E> h, p, q; 681 for (p = h = head;; p = q) { 682 E item; 683 if ((item = p.item) != null) { 684 nextNode = p; 685 nextItem = item; 686 break; 687 } 688 else if ((q = p.next) == null) 689 break; 690 else if (p == q) 691 continue restartFromHead; 692 } 693 updateHead(h, p); 694 return; 695 } 696 } 697 hasNext()698 public boolean hasNext() { 699 return nextItem != null; 700 } 701 next()702 public E next() { 703 final Node<E> pred = nextNode; 704 if (pred == null) throw new NoSuchElementException(); 705 // assert nextItem != null; 706 lastRet = pred; 707 E item = null; 708 709 for (Node<E> p = succ(pred), q;; p = q) { 710 if (p == null || (item = p.item) != null) { 711 nextNode = p; 712 E x = nextItem; 713 nextItem = item; 714 return x; 715 } 716 // unlink deleted nodes 717 if ((q = succ(p)) != null) 718 casNext(pred, p, q); 719 } 720 } 721 remove()722 public void remove() { 723 Node<E> l = lastRet; 724 if (l == null) throw new IllegalStateException(); 725 // rely on a future traversal to relink. 726 l.item = null; 727 lastRet = null; 728 } 729 } 730 731 /** 732 * Saves this queue to a stream (that is, serializes it). 733 * 734 * @param s the stream 735 * @throws java.io.IOException if an I/O error occurs 736 * @serialData All of the elements (each an {@code E}) in 737 * the proper order, followed by a null 738 */ writeObject(java.io.ObjectOutputStream s)739 private void writeObject(java.io.ObjectOutputStream s) 740 throws java.io.IOException { 741 742 // Write out any hidden stuff 743 s.defaultWriteObject(); 744 745 // Write out all elements in the proper order. 746 for (Node<E> p = first(); p != null; p = succ(p)) { 747 Object item = p.item; 748 if (item != null) 749 s.writeObject(item); 750 } 751 752 // Use trailing null as sentinel 753 s.writeObject(null); 754 } 755 756 /** 757 * Reconstitutes this queue from a stream (that is, deserializes it). 758 * @param s the stream 759 * @throws ClassNotFoundException if the class of a serialized object 760 * could not be found 761 * @throws java.io.IOException if an I/O error occurs 762 */ readObject(java.io.ObjectInputStream s)763 private void readObject(java.io.ObjectInputStream s) 764 throws java.io.IOException, ClassNotFoundException { 765 s.defaultReadObject(); 766 767 // Read in elements until trailing null sentinel found 768 Node<E> h = null, t = null; 769 for (Object item; (item = s.readObject()) != null; ) { 770 @SuppressWarnings("unchecked") 771 Node<E> newNode = newNode((E) item); 772 if (h == null) 773 h = t = newNode; 774 else { 775 lazySetNext(t, newNode); 776 t = newNode; 777 } 778 } 779 if (h == null) 780 h = t = newNode(null); 781 head = h; 782 tail = t; 783 } 784 785 /** A customized variant of Spliterators.IteratorSpliterator */ 786 static final class CLQSpliterator<E> implements Spliterator<E> { 787 static final int MAX_BATCH = 1 << 25; // max batch array size; 788 final ConcurrentLinkedQueue<E> queue; 789 Node<E> current; // current node; null until initialized 790 int batch; // batch size for splits 791 boolean exhausted; // true when no more nodes CLQSpliterator(ConcurrentLinkedQueue<E> queue)792 CLQSpliterator(ConcurrentLinkedQueue<E> queue) { 793 this.queue = queue; 794 } 795 trySplit()796 public Spliterator<E> trySplit() { 797 Node<E> p; 798 final ConcurrentLinkedQueue<E> q = this.queue; 799 int b = batch; 800 int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; 801 if (!exhausted && 802 ((p = current) != null || (p = q.first()) != null) && 803 p.next != null) { 804 Object[] a = new Object[n]; 805 int i = 0; 806 do { 807 if ((a[i] = p.item) != null) 808 ++i; 809 if (p == (p = p.next)) 810 p = q.first(); 811 } while (p != null && i < n); 812 if ((current = p) == null) 813 exhausted = true; 814 if (i > 0) { 815 batch = i; 816 return Spliterators.spliterator 817 (a, 0, i, (Spliterator.ORDERED | 818 Spliterator.NONNULL | 819 Spliterator.CONCURRENT)); 820 } 821 } 822 return null; 823 } 824 forEachRemaining(Consumer<? super E> action)825 public void forEachRemaining(Consumer<? super E> action) { 826 Node<E> p; 827 if (action == null) throw new NullPointerException(); 828 final ConcurrentLinkedQueue<E> q = this.queue; 829 if (!exhausted && 830 ((p = current) != null || (p = q.first()) != null)) { 831 exhausted = true; 832 do { 833 E e = p.item; 834 if (p == (p = p.next)) 835 p = q.first(); 836 if (e != null) 837 action.accept(e); 838 } while (p != null); 839 } 840 } 841 tryAdvance(Consumer<? super E> action)842 public boolean tryAdvance(Consumer<? super E> action) { 843 Node<E> p; 844 if (action == null) throw new NullPointerException(); 845 final ConcurrentLinkedQueue<E> q = this.queue; 846 if (!exhausted && 847 ((p = current) != null || (p = q.first()) != null)) { 848 E e; 849 do { 850 e = p.item; 851 if (p == (p = p.next)) 852 p = q.first(); 853 } while (e == null && p != null); 854 if ((current = p) == null) 855 exhausted = true; 856 if (e != null) { 857 action.accept(e); 858 return true; 859 } 860 } 861 return false; 862 } 863 estimateSize()864 public long estimateSize() { return Long.MAX_VALUE; } 865 characteristics()866 public int characteristics() { 867 return Spliterator.ORDERED | Spliterator.NONNULL | 868 Spliterator.CONCURRENT; 869 } 870 } 871 872 /** 873 * Returns a {@link Spliterator} over the elements in this queue. 874 * 875 * <p>The returned spliterator is 876 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 877 * 878 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, 879 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. 880 * 881 * @implNote 882 * The {@code Spliterator} implements {@code trySplit} to permit limited 883 * parallelism. 884 * 885 * @return a {@code Spliterator} over the elements in this queue 886 * @since 1.8 887 */ 888 @Override spliterator()889 public Spliterator<E> spliterator() { 890 return new CLQSpliterator<E>(this); 891 } 892 casTail(Node<E> cmp, Node<E> val)893 private boolean casTail(Node<E> cmp, Node<E> val) { 894 return U.compareAndSwapObject(this, TAIL, cmp, val); 895 } 896 casHead(Node<E> cmp, Node<E> val)897 private boolean casHead(Node<E> cmp, Node<E> val) { 898 return U.compareAndSwapObject(this, HEAD, cmp, val); 899 } 900 901 // Unsafe mechanics 902 903 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 904 private static final long HEAD; 905 private static final long TAIL; 906 private static final long ITEM; 907 private static final long NEXT; 908 static { 909 try { 910 HEAD = U.objectFieldOffset 911 (ConcurrentLinkedQueue.class.getDeclaredField("head")); 912 TAIL = U.objectFieldOffset 913 (ConcurrentLinkedQueue.class.getDeclaredField("tail")); 914 ITEM = U.objectFieldOffset 915 (Node.class.getDeclaredField("item")); 916 NEXT = U.objectFieldOffset 917 (Node.class.getDeclaredField("next")); 918 } catch (ReflectiveOperationException e) { 919 throw new Error(e); 920 } 921 } 922 } 923