1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.invoke.MethodHandles; 39 import java.lang.invoke.VarHandle; 40 import java.util.AbstractQueue; 41 import java.util.Arrays; 42 import java.util.Collection; 43 import java.util.Iterator; 44 import java.util.NoSuchElementException; 45 import java.util.Objects; 46 import java.util.Queue; 47 import java.util.Spliterator; 48 import java.util.Spliterators; 49 import java.util.concurrent.locks.LockSupport; 50 import java.util.concurrent.ForkJoinWorkerThread; 51 import java.util.function.Consumer; 52 import java.util.function.Predicate; 53 54 /** 55 * An unbounded {@link TransferQueue} based on linked nodes. 56 * This queue orders elements FIFO (first-in-first-out) with respect 57 * to any given producer. The <em>head</em> of the queue is that 58 * element that has been on the queue the longest time for some 59 * producer. The <em>tail</em> of the queue is that element that has 60 * been on the queue the shortest time for some producer. 61 * 62 * <p>Beware that, unlike in most collections, the {@code size} method 63 * is <em>NOT</em> a constant-time operation. Because of the 64 * asynchronous nature of these queues, determining the current number 65 * of elements requires a traversal of the elements, and so may report 66 * inaccurate results if this collection is modified during traversal. 67 * 68 * <p>Bulk operations that add, remove, or examine multiple elements, 69 * such as {@link #addAll}, {@link #removeIf} or {@link #forEach}, 70 * are <em>not</em> guaranteed to be performed atomically. 71 * For example, a {@code forEach} traversal concurrent with an {@code 72 * addAll} operation might observe only some of the added elements. 73 * 74 * <p>This class and its iterator implement all of the <em>optional</em> 75 * methods of the {@link Collection} and {@link Iterator} interfaces. 76 * 77 * <p>Memory consistency effects: As with other concurrent 78 * collections, actions in a thread prior to placing an object into a 79 * {@code LinkedTransferQueue} 80 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 81 * actions subsequent to the access or removal of that element from 82 * the {@code LinkedTransferQueue} in another thread. 83 * 84 * <p>This class is a member of the 85 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework"> 86 * Java Collections Framework</a>. 87 * 88 * @since 1.7 89 * @author Doug Lea 90 * @param <E> the type of elements held in this queue 91 */ 92 public class LinkedTransferQueue<E> extends AbstractQueue<E> 93 implements TransferQueue<E>, java.io.Serializable { 94 private static final long serialVersionUID = -3223113410248163686L; 95 96 /* 97 * *** Overview of Dual Queues with Slack *** 98 * 99 * Dual Queues, introduced by Scherer and Scott 100 * (http://www.cs.rochester.edu/~scott/papers/2004_DISC_dual_DS.pdf) 101 * are (linked) queues in which nodes may represent either data or 102 * requests. When a thread tries to enqueue a data node, but 103 * encounters a request node, it instead "matches" and removes it; 104 * and vice versa for enqueuing requests. Blocking Dual Queues 105 * arrange that threads enqueuing unmatched requests block until 106 * other threads provide the match. Dual Synchronous Queues (see 107 * Scherer, Lea, & Scott 108 * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf) 109 * additionally arrange that threads enqueuing unmatched data also 110 * block. Dual Transfer Queues support all of these modes, as 111 * dictated by callers. All enqueue/dequeue operations can be 112 * handled by a single method (here, "xfer") with parameters 113 * indicating whether to act as some form of offer, put, poll, 114 * take, or transfer (each possibly with timeout), as described 115 * below. 116 * 117 * A FIFO dual queue may be implemented using a variation of the 118 * Michael & Scott (M&S) lock-free queue algorithm 119 * (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf). 120 * It maintains two pointer fields, "head", pointing to a 121 * (matched) node that in turn points to the first actual 122 * (unmatched) queue node (or null if empty); and "tail" that 123 * points to the last node on the queue (or again null if 124 * empty). For example, here is a possible queue with four data 125 * elements: 126 * 127 * head tail 128 * | | 129 * v v 130 * M -> U -> U -> U -> U 131 * 132 * The M&S queue algorithm is known to be prone to scalability and 133 * overhead limitations when maintaining (via CAS) these head and 134 * tail pointers. To address these, dual queues with slack differ 135 * from plain M&S dual queues by virtue of only sometimes updating 136 * head or tail pointers when matching, appending, or even 137 * traversing nodes. 138 * 139 * In a dual queue, each node must atomically maintain its match 140 * status. Matching entails CASing an "item" field from a non-null 141 * data value to null upon match, and vice-versa for request 142 * nodes, CASing from null to a data value. (To reduce the need 143 * for re-reads, we use the compareAndExchange forms of CAS for 144 * pointer updates, that provide the current value to continue 145 * with on failure.) Note that the linearization properties of 146 * this style of queue are easy to verify -- elements are made 147 * available by linking, and unavailable by matching. Compared to 148 * plain M&S queues, this property of dual queues requires one 149 * additional successful atomic operation per enq/deq pair. But it 150 * also enables lower cost variants of queue maintenance 151 * mechanics. 152 * 153 * Once a node is matched, it is no longer live -- its match 154 * status can never again change. We may thus arrange that the 155 * linked list of them contain a prefix of zero or more dead 156 * nodes, followed by a suffix of zero or more live nodes. Note 157 * that we allow both the prefix and suffix to be zero length, 158 * which in turn means that we do not require a dummy header. 159 * 160 * We use here an approach that lies between the extremes of 161 * never versus always updating queue (head and tail) pointers. 162 * This offers a tradeoff between sometimes requiring extra 163 * traversal steps to locate the first and/or last unmatched 164 * nodes, versus the reduced overhead and contention of fewer 165 * updates to queue pointers. For example, a possible snapshot of 166 * a queue is: 167 * 168 * head tail 169 * | | 170 * v v 171 * M -> M -> U -> U -> U -> U 172 * 173 * The best value for this "slack" (the targeted maximum distance 174 * between the value of "head" and the first unmatched node, and 175 * similarly for "tail") is an empirical matter. Larger values 176 * introduce increasing costs of cache misses and risks of long 177 * traversal chains and out-of-order updates, while smaller values 178 * increase CAS contention and overhead. Using the smallest 179 * non-zero value of one is both simple and empirically a good 180 * choice in most applicatkions. The slack value is hard-wired: a 181 * path greater than one is usually implemented by checking 182 * equality of traversal pointers. Because CASes updating fields 183 * attempting to do so may stall, the writes may appear out of 184 * order (an older CAS from the same head or tail may execute 185 * after a newer one), the actual slack may exceed targeted 186 * slack. To reduce impact, other threads may help update by 187 * unsplicing dead nodes while traversing. 188 * 189 * These ideas must be further extended to avoid unbounded amounts 190 * of costly-to-reclaim garbage caused by the sequential "next" 191 * links of nodes starting at old forgotten head nodes: As first 192 * described in detail by Boehm 193 * (http://portal.acm.org/citation.cfm?doid=503272.503282), if a 194 * GC delays noticing that any arbitrarily old node has become 195 * garbage, all newer dead nodes will also be unreclaimed. 196 * (Similar issues arise in non-GC environments.) To cope with 197 * this in our implementation, upon advancing the head pointer, we 198 * set the "next" link of the previous head to point only to 199 * itself; thus limiting the length of chains of dead nodes. (We 200 * also take similar care to wipe out possibly garbage retaining 201 * values held in other node fields.) This is easy to accommodate 202 * in the primary xfer method, but adds a lot of complexity to 203 * Collection operations including traversal; mainly because if 204 * any "next" pointer links to itself, the current thread has 205 * lagged behind a head-update, and so must restart. 206 * 207 * *** Blocking *** 208 * 209 * The DualNode class is shared with class SynchronousQueue. It 210 * houses method await, which is used for all blocking control, as 211 * described below in DualNode internal documentation. 212 * 213 * ** Unlinking removed interior nodes ** 214 * 215 * In addition to minimizing garbage retention via self-linking 216 * described above, we also unlink removed interior nodes. These 217 * may arise due to timed out or interrupted waits, or calls to 218 * remove(x) or Iterator.remove. Normally, given a node that was 219 * at one time known to be the predecessor of some node s that is 220 * to be removed, we can unsplice s by CASing the next field of 221 * its predecessor if it still points to s (otherwise s must 222 * already have been removed or is now offlist). But there are two 223 * situations in which we cannot guarantee to make node s 224 * unreachable in this way: (1) If s is the trailing node of list 225 * (i.e., with null next), then it is pinned as the target node 226 * for appends, so can only be removed later after other nodes are 227 * appended. (2) Unless we know it is already off-list, we cannot 228 * necessarily unlink s given a predecessor node that is matched 229 * (including the case of being cancelled): the predecessor may 230 * already be unspliced, in which case some previous reachable 231 * node may still point to s. (For further explanation see 232 * Herlihy & Shavit "The Art of Multiprocessor Programming" 233 * chapter 9). 234 * 235 * Without taking these into account, it would be possible for an 236 * unbounded number of supposedly removed nodes to remain reachable. 237 * Situations leading to such buildup are uncommon but can occur 238 * in practice; for example when a series of short timed calls to 239 * poll repeatedly time out at the trailing node but otherwise 240 * never fall off the list because of an untimed call to take() at 241 * the front of the queue. 242 * 243 * When these cases arise, rather than always retraversing the 244 * entire list to find an actual predecessor to unlink (which 245 * won't help for case (1) anyway), we record a conservative 246 * estimate of possible unsplice failures (in "sweepVotes"). 247 * We trigger a full sweep when the estimate exceeds a threshold 248 * ("SWEEP_THRESHOLD") indicating the maximum number of estimated 249 * removal failures to tolerate before sweeping through, unlinking 250 * cancelled nodes that were not unlinked upon initial removal. 251 * We perform sweeps by the thread hitting threshold (rather than 252 * background threads or by spreading work to other threads) 253 * because in the main contexts in which removal occurs, the 254 * caller is timed-out or cancelled, which are not time-critical 255 * enough to warrant the overhead that alternatives would impose 256 * on other threads. 257 * 258 * Because the sweepVotes estimate is conservative, and because 259 * nodes become unlinked "naturally" as they fall off the head of 260 * the queue, and because we allow votes to accumulate even while 261 * sweeps are in progress, there are typically significantly fewer 262 * such nodes than estimated. 263 * 264 * Note that we cannot self-link unlinked interior nodes during 265 * sweeps. However, the associated garbage chains terminate when 266 * some successor ultimately falls off the head of the list and is 267 * self-linked. 268 * 269 * *** Revision notes *** 270 * 271 * This version differs from previous releases as follows: 272 * 273 * * Class DualNode replaces Qnode, with fields and methods 274 * that apply to any match-based dual data structure, and now 275 * usable in other j.u.c classes. in particular, SynchronousQueue. 276 * * Blocking control (in class DualNode) accommodates 277 * VirtualThreads and (perhaps virtualized) uniprocessors. 278 * * All fields of this class (LinkedTransferQueue) are 279 * default-initializable (to null), allowing further extension 280 * (in particular, SynchronousQueue.Transferer) 281 * * Head and tail fields are lazily initialized rather than set 282 * to a dummy node, while also reducing retries under heavy 283 * contention and misorderings, and relaxing some accesses, 284 * requiring accommodation in many places (as well as 285 * adjustments in WhiteBox tests). 286 */ 287 288 /** 289 * Node for linked dual data structures. Uses type Object, not E, 290 * for items to allow cancellation and forgetting after use. Only 291 * field "item" is declared volatile (with bypasses for 292 * pre-publication and post-match writes), although field "next" 293 * is also CAS-able. Other accesses are constrained by context 294 * (including dependent chains of next's headed by a volatile 295 * read). 296 * 297 * This class also arranges blocking while awaiting matches. 298 * Control of blocking (and thread scheduling in general) for 299 * possibly-synchronous queues (and channels etc constructed 300 * from them) must straddle two extremes: If there are too few 301 * underlying cores for a fulfilling party to continue, then 302 * the caller must park to cause a context switch. On the 303 * other hand, if the queue is busy with approximately the 304 * same number of independent producers and consumers, then 305 * that context switch may cause an order-of-magnitude 306 * slowdown. Many cases are somewhere in-between, in which 307 * case threads should try spinning and then give up and 308 * block. We deal with this as follows: 309 * 310 * 1. Callers to method await indicate eligibility for 311 * spinning when the node is either the only waiting node, or 312 * the next matchable node is still spinning. Otherwise, the 313 * caller may block (almost) immediately. 314 * 315 * 2. Even if eligible to spin, a caller blocks anyway in two 316 * cases where it is normally best: If the thread isVirtual, 317 * or the system is a uniprocessor. Uniprocessor status can 318 * vary over time (due to virtualization at other system 319 * levels), but checking Runtime availableProcessors can be 320 * slow and may itself acquire blocking locks, so we only 321 * occasionally (using ThreadLocalRandom) update when an 322 * otherwise-eligible spin elapses. 323 * 324 * 3. When enabled, spins should be long enough to cover 325 * bookeeping overhead of almost-immediate fulfillments, but 326 * much less than the expected time of a (non-virtual) 327 * park/unpark context switch. The optimal value is 328 * unknowable, in part because the relative costs of 329 * Thread.onSpinWait versus park/unpark vary across platforms. 330 * The current value is an empirical compromise across tested 331 * platforms. 332 * 333 * 4. When using timed waits, callers spin instead of invoking 334 * timed park if the remaining time is less than the likely cost 335 * of park/unpark. This also avoids re-parks when timed park 336 * returns just barely too soon. As is the case in most j.u.c 337 * blocking support, untimed waits use ManagedBlockers when 338 * callers are ForkJoin threads, but timed waits use plain 339 * parkNanos, under the rationale that known-to-be transient 340 * blocking doesn't require compensation. (This decision should be 341 * revisited here and elsewhere to deal with very long timeouts.) 342 * 343 * 5. Park/unpark signalling otherwise relies on a Dekker-like 344 * scheme in which the caller advertises the need to unpark by 345 * setting its waiter field, followed by a full fence and recheck 346 * before actually parking. An explicit fence in used here rather 347 * than unnecessarily requiring volatile accesses elsewhere. This 348 * fence also separates accesses to field isUniprocessor. 349 * 350 * 6. To make the above work, callers must precheck that 351 * timeouts are not already elapsed, and that interruptible 352 * operations were not already interrupted on call to the 353 * corresponding queue operation. Cancellation on timeout or 354 * interrupt otherwise proceeds by trying to fulfill with an 355 * impossible value (which is one reason that we use Object 356 * types here rather than typed fields). 357 */ 358 static final class DualNode implements ForkJoinPool.ManagedBlocker { 359 volatile Object item; // initially non-null if isData; CASed to match 360 DualNode next; // accessed only in chains of volatile ops 361 Thread waiter; // access order constrained by context 362 final boolean isData; // false if this is a request node 363 DualNode(Object item, boolean isData)364 DualNode(Object item, boolean isData) { 365 ITEM.set(this, item); // relaxed write before publication 366 this.isData = isData; 367 } 368 369 // Atomic updates cmpExItem(Object cmp, Object val)370 final Object cmpExItem(Object cmp, Object val) { // try to match 371 return ITEM.compareAndExchange(this, cmp, val); 372 } cmpExNext(DualNode cmp, DualNode val)373 final DualNode cmpExNext(DualNode cmp, DualNode val) { 374 return (DualNode)NEXT.compareAndExchange(this, cmp, val); 375 } 376 377 /** Returns true if this node has been matched or cancelled */ matched()378 final boolean matched() { 379 return isData != (item != null); 380 } 381 382 /** 383 * Relaxed write to replace reference to user data with 384 * self-link. Can be used only if not already null after 385 * match. 386 */ selfLinkItem()387 final void selfLinkItem() { 388 ITEM.set(this, this); 389 } 390 391 /** The number of times to spin when eligible */ 392 private static final int SPINS = 1 << 7; 393 394 /** 395 * The number of nanoseconds for which it is faster to spin 396 * rather than to use timed park. A rough estimate suffices. 397 */ 398 private static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1L << 10; 399 400 /** 401 * True if system is a uniprocessor, occasionally rechecked. 402 */ 403 private static boolean isUniprocessor = 404 (Runtime.getRuntime().availableProcessors() == 1); 405 406 /** 407 * Refresh rate (probablility) for updating isUniprocessor 408 * field, to reduce the likeihood that multiple calls to await 409 * will contend invoking Runtime.availableProcessors. Must be 410 * a power of two minus one. 411 */ 412 private static final int UNIPROCESSOR_REFRESH_RATE = (1 << 5) - 1; 413 414 /** 415 * Possibly blocks until matched or caller gives up. 416 * 417 * @param e the comparison value for checking match 418 * @param ns timeout, or Long.MAX_VALUE if untimed 419 * @param blocker the LockSupport.setCurrentBlocker argument 420 * @param spin true if should spin when enabled 421 * @return matched item, or e if unmatched on interrupt or timeout 422 */ await(Object e, long ns, Object blocker, boolean spin)423 final Object await(Object e, long ns, Object blocker, boolean spin) { 424 Object m; // the match or e if none 425 boolean timed = (ns != Long.MAX_VALUE); 426 long deadline = (timed) ? System.nanoTime() + ns : 0L; 427 boolean upc = isUniprocessor; // don't spin but later recheck 428 Thread w = Thread.currentThread(); 429 if (w.isVirtual()) // don't spin 430 spin = false; 431 int spins = (spin & !upc) ? SPINS : 0; // negative when may park 432 while ((m = item) == e) { 433 if (spins >= 0) { 434 if (--spins >= 0) 435 Thread.onSpinWait(); 436 else { // prepare to park 437 if (spin) // occasionally recheck 438 checkForUniprocessor(upc); 439 LockSupport.setCurrentBlocker(blocker); 440 waiter = w; // ensure ordering 441 VarHandle.fullFence(); 442 } 443 } else if (w.isInterrupted() || 444 (timed && // try to cancel with impossible match 445 ((ns = deadline - System.nanoTime()) <= 0L))) { 446 m = cmpExItem(e, (e == null) ? this : null); 447 break; 448 } else if (timed) { 449 if (ns < SPIN_FOR_TIMEOUT_THRESHOLD) 450 Thread.onSpinWait(); 451 else 452 LockSupport.parkNanos(ns); 453 } else if (w instanceof ForkJoinWorkerThread) { 454 try { 455 ForkJoinPool.managedBlock(this); 456 } catch (InterruptedException cannotHappen) { } 457 } else 458 LockSupport.park(); 459 } 460 if (spins < 0) { 461 LockSupport.setCurrentBlocker(null); 462 waiter = null; 463 } 464 return m; 465 } 466 467 /** Occasionally updates isUniprocessor field */ checkForUniprocessor(boolean prev)468 private void checkForUniprocessor(boolean prev) { 469 int r = ThreadLocalRandom.nextSecondarySeed(); 470 if ((r & UNIPROCESSOR_REFRESH_RATE) == 0) { 471 boolean u = (Runtime.getRuntime().availableProcessors() == 1); 472 if (u != prev) 473 isUniprocessor = u; 474 } 475 } 476 477 // ManagedBlocker support isReleasable()478 public final boolean isReleasable() { 479 return (matched() || Thread.currentThread().isInterrupted()); 480 } block()481 public final boolean block() { 482 while (!isReleasable()) LockSupport.park(); 483 return true; 484 } 485 486 // VarHandle mechanics 487 static final VarHandle ITEM; 488 static final VarHandle NEXT; 489 static { 490 try { 491 Class<?> tn = DualNode.class; 492 MethodHandles.Lookup l = MethodHandles.lookup(); 493 ITEM = l.findVarHandle(tn, "item", Object.class); 494 NEXT = l.findVarHandle(tn, "next", tn); 495 } catch (ReflectiveOperationException e) { 496 throw new ExceptionInInitializerError(e); 497 } 498 // Reduce the risk of rare disastrous classloading in first call to 499 // LockSupport.park: https://bugs.openjdk.org/browse/JDK-8074773 500 Class<?> ensureLoaded = LockSupport.class; 501 } 502 } 503 504 /** 505 * Unless empty (in which case possibly null), a node from which 506 * all live nodes are reachable. 507 * Invariants: 508 * - head is never self-linked 509 * Non-invariants: 510 * - head may or may not be live 511 * 512 * This field is used by subclass SynchronousQueue.Transferer to 513 * record the top of a Lifo stack, with tail always null, but 514 * otherwise maintaining the same properties. 515 */ 516 transient volatile DualNode head; 517 518 /** 519 * Unless null, a node from which the last node on list (that is, 520 * the unique node with node.next == null), if one exists, can be 521 * reached. 522 * Non-invariants: 523 * - tail may or may not be live 524 * - tail may be the same as head 525 * - tail may or may not be self-linked. 526 * - tail may lag behind head, so need not be reachable from head 527 */ 528 transient volatile DualNode tail; 529 530 /** The number of apparent failures to unsplice cancelled nodes */ 531 transient volatile int sweepVotes; 532 533 // Atomic updates 534 cmpExTail(DualNode cmp, DualNode val)535 final DualNode cmpExTail(DualNode cmp, DualNode val) { 536 return (DualNode)TAIL.compareAndExchange(this, cmp, val); 537 } cmpExHead(DualNode cmp, DualNode val)538 final DualNode cmpExHead(DualNode cmp, DualNode val) { 539 return (DualNode)HEAD.compareAndExchange(this, cmp, val); 540 } 541 542 /** 543 * The maximum number of estimated removal failures (sweepVotes) 544 * to tolerate before sweeping through the queue unlinking 545 * dead nodes that were initially pinned. Must be a power of 546 * two minus one, at least 3. 547 */ 548 static final int SWEEP_THRESHOLD = (1 << 4) - 1; 549 550 /** 551 * Adds a sweepVote and returns true if triggered threshold. 552 */ sweepNow()553 final boolean sweepNow() { 554 return (SWEEP_THRESHOLD == 555 ((int)SWEEPVOTES.getAndAdd(this, 1) & (SWEEP_THRESHOLD))); 556 } 557 558 /** 559 * Implements all queuing methods. Loops, trying: 560 * 561 * * If not initialized, try to add new node (unless immediate) and exit 562 * * If tail has same mode, start traversing at tail for a likely 563 * append, else at head for a likely match 564 * * Traverse over dead or wrong-mode nodes until finding a spot 565 * to match/append, or falling off the list because of self-links. 566 * * On success, update head or tail if slacked, and possibly wait, 567 * depending on ns argument 568 * 569 * @param e the item or null for take 570 * @param ns timeout or negative if async, 0 if immediate, 571 * Long.MAX_VALUE if untimed 572 * @return an item if matched, else e 573 */ xfer(Object e, long ns)574 final Object xfer(Object e, long ns) { 575 boolean haveData = (e != null); 576 Object m; // the match or e if none 577 DualNode s = null, p; // enqueued node and its predecessor 578 restart: for (DualNode prevp = null;;) { 579 DualNode h, t, q; 580 if ((h = head) == null && // initialize unless immediate 581 (ns == 0L || 582 (h = cmpExHead(null, s = new DualNode(e, haveData))) == null)) { 583 p = null; // no predecessor 584 break; // else lost init race 585 } 586 p = (t = tail) != null && t.isData == haveData && t != prevp ? t : h; 587 prevp = p; // avoid known self-linked tail path 588 do { 589 m = p.item; 590 q = p.next; 591 if (p.isData != haveData && haveData != (m != null) && 592 p.cmpExItem(m, e) == m) { 593 Thread w = p.waiter; // matched complementary node 594 if (p != h && h == cmpExHead(h, (q == null) ? p : q)) 595 h.next = h; // advance head; self-link old 596 LockSupport.unpark(w); 597 return m; 598 } else if (q == null) { 599 if (ns == 0L) // try to append unless immediate 600 break restart; 601 if (s == null) 602 s = new DualNode(e, haveData); 603 if ((q = p.cmpExNext(null, s)) == null) { 604 if (p != t) 605 cmpExTail(t, s); 606 break restart; 607 } 608 } 609 } while (p != (p = q)); // restart if self-linked 610 } 611 if (s == null || ns <= 0L) 612 m = e; // don't wait 613 else if ((m = s.await(e, ns, this, // spin if at or near head 614 p == null || p.waiter == null)) == e) 615 unsplice(p, s); // cancelled 616 else if (m != null) 617 s.selfLinkItem(); 618 619 return m; 620 } 621 622 /* -------------- Removals -------------- */ 623 624 /** 625 * Unlinks (now or later) the given (non-live) node with given 626 * predecessor. See above for rationale. 627 * 628 * @param pred if nonnull, a node that was at one time known to be the 629 * predecessor of s (else s may have been head) 630 * @param s the node to be unspliced 631 */ unsplice(DualNode pred, DualNode s)632 private void unsplice(DualNode pred, DualNode s) { 633 boolean seen = false; // try removing by collapsing head 634 for (DualNode h = head, p = h, f; p != null;) { 635 boolean matched; 636 if (p == s) 637 matched = seen = true; 638 else 639 matched = p.matched(); 640 if ((f = p.next) == p) 641 p = h = head; 642 else if (f != null && matched) 643 p = f; 644 else { 645 if (p != h && cmpExHead(h, p) == h) 646 h.next = h; // self-link 647 break; 648 } 649 } 650 DualNode sn; // try to unsplice if not pinned 651 if (!seen && 652 pred != null && pred.next == s && s != null && (sn = s.next) != s && 653 (sn == null || pred.cmpExNext(s, sn) != s || pred.matched()) && 654 sweepNow()) { // occasionally sweep if might not have been removed 655 for (DualNode p = head, f, n, u; 656 p != null && (f = p.next) != null && (n = f.next) != null;) { 657 p = (f == p ? head : // stale 658 !f.matched() ? f : // skip 659 f == (u = p.cmpExNext(f, n)) ? n : u); // unspliced 660 } 661 } 662 } 663 664 /** 665 * Tries to CAS pred.next (or head, if pred is null) from c to p. 666 * Caller must ensure that we're not unlinking the trailing node. 667 */ tryCasSuccessor(DualNode pred, DualNode c, DualNode p)668 final boolean tryCasSuccessor(DualNode pred, DualNode c, DualNode p) { 669 // assert p != null && c.matched() && c != p; 670 if (pred != null) 671 return pred.cmpExNext(c, p) == c; 672 else if (cmpExHead(c, p) != c) 673 return false; 674 if (c != null) 675 c.next = c; 676 677 return true; 678 } 679 680 /** 681 * Collapses dead (matched) nodes between pred and q. 682 * @param pred the last known live node, or null if none 683 * @param c the first dead node 684 * @param p the last dead node 685 * @param q p.next: the next live node, or null if at end 686 * @return pred if pred still alive and CAS succeeded; else p 687 */ skipDeadNodes(DualNode pred, DualNode c, DualNode p, DualNode q)688 final DualNode skipDeadNodes(DualNode pred, DualNode c, 689 DualNode p, DualNode q) { 690 // assert pred != c && p != q; && c.matched() && p.matched(); 691 if (q == null) { // Never unlink trailing node. 692 if (c == p) 693 return pred; 694 q = p; 695 } 696 return (tryCasSuccessor(pred, c, q) && (pred == null || !pred.matched())) 697 ? pred : p; 698 } 699 700 /** 701 * Tries to match the given object only if p is a data 702 * node. Signals waiter on success. 703 */ tryMatchData(DualNode p, Object x)704 final boolean tryMatchData(DualNode p, Object x) { 705 if (p != null && p.isData && 706 x != null && p.cmpExItem(x, null) == x) { 707 LockSupport.unpark(p.waiter); 708 return true; 709 } 710 return false; 711 } 712 713 /* -------------- Traversal methods -------------- */ 714 715 /** 716 * Returns the first unmatched data node, or null if none. 717 * Callers must recheck if the returned node is unmatched 718 * before using. 719 */ firstDataNode()720 final DualNode firstDataNode() { 721 for (DualNode h = head, p = h, q, u; p != null;) { 722 boolean isData = p.isData; 723 Object item = p.item; 724 if (isData && item != null) // is live data 725 return p; 726 else if (!isData && item == null) // is live request 727 break; 728 else if ((q = p.next) == null) // end of list 729 break; 730 else if (p == q) // self-link; restart 731 p = h = head; 732 else if (p == h) // traverse past header 733 p = q; 734 else if ((u = cmpExHead(h, q)) != h) 735 p = h = u; // lost update race 736 else { 737 h.next = h; // collapse; self-link 738 p = h = q; 739 } 740 } 741 return null; 742 } 743 744 /** 745 * Traverses and counts unmatched nodes of the given mode. 746 * Used by methods size and getWaitingConsumerCount. 747 */ countOfMode(boolean data)748 final int countOfMode(boolean data) { 749 restartFromHead: for (;;) { 750 int count = 0; 751 for (DualNode p = head; p != null;) { 752 if (!p.matched()) { 753 if (p.isData != data) 754 return 0; 755 if (++count == Integer.MAX_VALUE) 756 break; // @see Collection.size() 757 } 758 if (p == (p = p.next)) 759 continue restartFromHead; 760 } 761 return count; 762 } 763 } 764 toString()765 public String toString() { 766 String[] a = null; 767 restartFromHead: for (;;) { 768 int charLength = 0; 769 int size = 0; 770 for (DualNode p = head; p != null;) { 771 Object item = p.item; 772 if (p.isData) { 773 if (item != null) { 774 if (a == null) 775 a = new String[4]; 776 else if (size == a.length) 777 a = Arrays.copyOf(a, 2 * size); 778 String s = item.toString(); 779 a[size++] = s; 780 charLength += s.length(); 781 } 782 } else if (item == null) 783 break; 784 if (p == (p = p.next)) 785 continue restartFromHead; 786 } 787 788 if (size == 0) 789 return "[]"; 790 791 return Helpers.toString(a, size, charLength); 792 } 793 } 794 toArrayInternal(Object[] a)795 private Object[] toArrayInternal(Object[] a) { 796 Object[] x = a; 797 restartFromHead: for (;;) { 798 int size = 0; 799 for (DualNode p = head; p != null;) { 800 Object item = p.item; 801 if (p.isData) { 802 if (item != null) { 803 if (x == null) 804 x = new Object[4]; 805 else if (size == x.length) 806 x = Arrays.copyOf(x, 2 * (size + 4)); 807 x[size++] = item; 808 } 809 } else if (item == null) 810 break; 811 if (p == (p = p.next)) 812 continue restartFromHead; 813 } 814 if (x == null) 815 return new Object[0]; 816 else if (a != null && size <= a.length) { 817 if (a != x) 818 System.arraycopy(x, 0, a, 0, size); 819 if (size < a.length) 820 a[size] = null; 821 return a; 822 } 823 return (size == x.length) ? x : Arrays.copyOf(x, size); 824 } 825 } 826 827 /** 828 * Returns an array containing all of the elements in this queue, in 829 * proper sequence. 830 * 831 * <p>The returned array will be "safe" in that no references to it are 832 * maintained by this queue. (In other words, this method must allocate 833 * a new array). The caller is thus free to modify the returned array. 834 * 835 * <p>This method acts as bridge between array-based and collection-based 836 * APIs. 837 * 838 * @return an array containing all of the elements in this queue 839 */ toArray()840 public Object[] toArray() { 841 return toArrayInternal(null); 842 } 843 844 /** 845 * Returns an array containing all of the elements in this queue, in 846 * proper sequence; the runtime type of the returned array is that of 847 * the specified array. If the queue fits in the specified array, it 848 * is returned therein. Otherwise, a new array is allocated with the 849 * runtime type of the specified array and the size of this queue. 850 * 851 * <p>If this queue fits in the specified array with room to spare 852 * (i.e., the array has more elements than this queue), the element in 853 * the array immediately following the end of the queue is set to 854 * {@code null}. 855 * 856 * <p>Like the {@link #toArray()} method, this method acts as bridge between 857 * array-based and collection-based APIs. Further, this method allows 858 * precise control over the runtime type of the output array, and may, 859 * under certain circumstances, be used to save allocation costs. 860 * 861 * <p>Suppose {@code x} is a queue known to contain only strings. 862 * The following code can be used to dump the queue into a newly 863 * allocated array of {@code String}: 864 * 865 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 866 * 867 * Note that {@code toArray(new Object[0])} is identical in function to 868 * {@code toArray()}. 869 * 870 * @param a the array into which the elements of the queue are to 871 * be stored, if it is big enough; otherwise, a new array of the 872 * same runtime type is allocated for this purpose 873 * @return an array containing all of the elements in this queue 874 * @throws ArrayStoreException if the runtime type of the specified array 875 * is not a supertype of the runtime type of every element in 876 * this queue 877 * @throws NullPointerException if the specified array is null 878 */ 879 @SuppressWarnings("unchecked") toArray(T[] a)880 public <T> T[] toArray(T[] a) { 881 Objects.requireNonNull(a); 882 return (T[]) toArrayInternal(a); 883 } 884 885 /** 886 * Weakly-consistent iterator. 887 * 888 * Lazily updated ancestor is expected to be amortized O(1) remove(), 889 * but O(n) in the worst case, when lastRet is concurrently deleted. 890 */ 891 final class Itr implements Iterator<E> { 892 private DualNode nextNode; // next node to return item for 893 private E nextItem; // the corresponding item 894 private DualNode lastRet; // last returned node, to support remove 895 private DualNode ancestor; // Helps unlink lastRet on remove() 896 897 /** 898 * Moves to next node after pred, or first node if pred null. 899 */ 900 @SuppressWarnings("unchecked") advance(DualNode pred)901 private void advance(DualNode pred) { 902 for (DualNode p = (pred == null) ? head : pred.next, c = p; 903 p != null; ) { 904 boolean isData = p.isData; 905 Object item = p.item; 906 if (isData && item != null) { 907 nextNode = p; 908 nextItem = (E) item; 909 if (c != p) 910 tryCasSuccessor(pred, c, p); 911 return; 912 } 913 else if (!isData && item == null) 914 break; 915 if (c != p && !tryCasSuccessor(pred, c, c = p)) { 916 pred = p; 917 c = p = p.next; 918 } 919 else if (p == (p = p.next)) { 920 pred = null; 921 c = p = head; 922 } 923 } 924 nextItem = null; 925 nextNode = null; 926 } 927 Itr()928 Itr() { 929 advance(null); 930 } 931 hasNext()932 public final boolean hasNext() { 933 return nextNode != null; 934 } 935 next()936 public final E next() { 937 DualNode p; 938 if ((p = nextNode) == null) throw new NoSuchElementException(); 939 E e = nextItem; 940 advance(lastRet = p); 941 return e; 942 } 943 forEachRemaining(Consumer<? super E> action)944 public void forEachRemaining(Consumer<? super E> action) { 945 Objects.requireNonNull(action); 946 DualNode q = null; 947 for (DualNode p; (p = nextNode) != null; advance(q = p)) 948 action.accept(nextItem); 949 if (q != null) 950 lastRet = q; 951 } 952 remove()953 public final void remove() { 954 final DualNode lastRet = this.lastRet; 955 if (lastRet == null) 956 throw new IllegalStateException(); 957 this.lastRet = null; 958 if (lastRet.item == null) // already deleted? 959 return; 960 // Advance ancestor, collapsing intervening dead nodes 961 DualNode pred = ancestor; 962 for (DualNode p = (pred == null) ? head : pred.next, c = p, q; 963 p != null; ) { 964 if (p == lastRet) { 965 tryMatchData(p, p.item); 966 if ((q = p.next) == null) q = p; 967 if (c != q) tryCasSuccessor(pred, c, q); 968 ancestor = pred; 969 return; 970 } 971 final Object item; final boolean pAlive; 972 if (pAlive = ((item = p.item) != null && p.isData)) { 973 // exceptionally, nothing to do 974 } 975 else if (!p.isData && item == null) 976 break; 977 if ((c != p && !tryCasSuccessor(pred, c, c = p)) || pAlive) { 978 pred = p; 979 c = p = p.next; 980 } 981 else if (p == (p = p.next)) { 982 pred = null; 983 c = p = head; 984 } 985 } 986 // traversal failed to find lastRet; must have been deleted; 987 // leave ancestor at original location to avoid overshoot; 988 // better luck next time! 989 990 // assert lastRet.matched(); 991 } 992 } 993 994 /** A customized variant of Spliterators.IteratorSpliterator */ 995 final class LTQSpliterator implements Spliterator<E> { 996 static final int MAX_BATCH = 1 << 25; // max batch array size; 997 DualNode current; // current node; null until initialized 998 int batch; // batch size for splits 999 boolean exhausted; // true when no more nodes LTQSpliterator()1000 LTQSpliterator() {} 1001 trySplit()1002 public Spliterator<E> trySplit() { 1003 DualNode p, q; 1004 if ((p = current()) == null || (q = p.next) == null) 1005 return null; 1006 int i = 0, n = batch = Math.min(batch + 1, MAX_BATCH); 1007 Object[] a = null; 1008 do { 1009 final Object item = p.item; 1010 if (p.isData) { 1011 if (item != null) { 1012 if (a == null) 1013 a = new Object[n]; 1014 a[i++] = item; 1015 } 1016 } else if (item == null) { 1017 p = null; 1018 break; 1019 } 1020 if (p == (p = q)) 1021 p = firstDataNode(); 1022 } while (p != null && (q = p.next) != null && i < n); 1023 setCurrent(p); 1024 return (i == 0) ? null : 1025 Spliterators.spliterator(a, 0, i, (Spliterator.ORDERED | 1026 Spliterator.NONNULL | 1027 Spliterator.CONCURRENT)); 1028 } 1029 forEachRemaining(Consumer<? super E> action)1030 public void forEachRemaining(Consumer<? super E> action) { 1031 Objects.requireNonNull(action); 1032 final DualNode p; 1033 if ((p = current()) != null) { 1034 current = null; 1035 exhausted = true; 1036 forEachFrom(action, p); 1037 } 1038 } 1039 1040 @SuppressWarnings("unchecked") tryAdvance(Consumer<? super E> action)1041 public boolean tryAdvance(Consumer<? super E> action) { 1042 Objects.requireNonNull(action); 1043 DualNode p; 1044 if ((p = current()) != null) { 1045 E e = null; 1046 do { 1047 boolean isData = p.isData; 1048 Object item = p.item; 1049 if (p == (p = p.next)) 1050 p = head; 1051 if (isData) { 1052 if (item != null) { 1053 e = (E) item; 1054 break; 1055 } 1056 } 1057 else if (item == null) 1058 p = null; 1059 } while (p != null); 1060 setCurrent(p); 1061 if (e != null) { 1062 action.accept(e); 1063 return true; 1064 } 1065 } 1066 return false; 1067 } 1068 setCurrent(DualNode p)1069 private void setCurrent(DualNode p) { 1070 if ((current = p) == null) 1071 exhausted = true; 1072 } 1073 current()1074 private DualNode current() { 1075 DualNode p; 1076 if ((p = current) == null && !exhausted) 1077 setCurrent(p = firstDataNode()); 1078 return p; 1079 } 1080 estimateSize()1081 public long estimateSize() { return Long.MAX_VALUE; } 1082 characteristics()1083 public int characteristics() { 1084 return (Spliterator.ORDERED | 1085 Spliterator.NONNULL | 1086 Spliterator.CONCURRENT); 1087 } 1088 } 1089 1090 /** 1091 * Returns a {@link Spliterator} over the elements in this queue. 1092 * 1093 * <p>The returned spliterator is 1094 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 1095 * 1096 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, 1097 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. 1098 * 1099 * @implNote 1100 * The {@code Spliterator} implements {@code trySplit} to permit limited 1101 * parallelism. 1102 * 1103 * @return a {@code Spliterator} over the elements in this queue 1104 * @since 1.8 1105 */ spliterator()1106 public Spliterator<E> spliterator() { 1107 return new LTQSpliterator(); 1108 } 1109 1110 /** 1111 * Creates an initially empty {@code LinkedTransferQueue}. 1112 */ LinkedTransferQueue()1113 public LinkedTransferQueue() { 1114 } 1115 1116 /** 1117 * Creates a {@code LinkedTransferQueue} 1118 * initially containing the elements of the given collection, 1119 * added in traversal order of the collection's iterator. 1120 * 1121 * @param c the collection of elements to initially contain 1122 * @throws NullPointerException if the specified collection or any 1123 * of its elements are null 1124 */ LinkedTransferQueue(Collection<? extends E> c)1125 public LinkedTransferQueue(Collection<? extends E> c) { 1126 DualNode h = null, t = null; 1127 for (E e : c) { 1128 DualNode newNode = new DualNode(Objects.requireNonNull(e), true); 1129 if (t == null) 1130 h = newNode; 1131 else 1132 t.next = newNode; 1133 t = newNode; 1134 } 1135 head = h; 1136 tail = t; 1137 } 1138 1139 /** 1140 * Inserts the specified element at the tail of this queue. 1141 * As the queue is unbounded, this method will never block. 1142 * 1143 * @throws NullPointerException if the specified element is null 1144 */ put(E e)1145 public void put(E e) { 1146 Objects.requireNonNull(e); 1147 xfer(e, -1L); 1148 } 1149 1150 /** 1151 * Inserts the specified element at the tail of this queue. 1152 * As the queue is unbounded, this method will never block or 1153 * return {@code false}. 1154 * 1155 * @return {@code true} (as specified by 1156 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) 1157 * @throws NullPointerException if the specified element is null 1158 */ offer(E e, long timeout, TimeUnit unit)1159 public boolean offer(E e, long timeout, TimeUnit unit) { 1160 Objects.requireNonNull(e); 1161 xfer(e, -1L); 1162 return true; 1163 } 1164 1165 /** 1166 * Inserts the specified element at the tail of this queue. 1167 * As the queue is unbounded, this method will never return {@code false}. 1168 * 1169 * @return {@code true} (as specified by {@link Queue#offer}) 1170 * @throws NullPointerException if the specified element is null 1171 */ offer(E e)1172 public boolean offer(E e) { 1173 Objects.requireNonNull(e); 1174 xfer(e, -1L); 1175 return true; 1176 } 1177 1178 /** 1179 * Inserts the specified element at the tail of this queue. 1180 * As the queue is unbounded, this method will never throw 1181 * {@link IllegalStateException} or return {@code false}. 1182 * 1183 * @return {@code true} (as specified by {@link Collection#add}) 1184 * @throws NullPointerException if the specified element is null 1185 */ add(E e)1186 public boolean add(E e) { 1187 Objects.requireNonNull(e); 1188 xfer(e, -1L); 1189 return true; 1190 } 1191 1192 /** 1193 * Transfers the element to a waiting consumer immediately, if possible. 1194 * 1195 * <p>More precisely, transfers the specified element immediately 1196 * if there exists a consumer already waiting to receive it (in 1197 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 1198 * otherwise returning {@code false} without enqueuing the element. 1199 * 1200 * @throws NullPointerException if the specified element is null 1201 */ tryTransfer(E e)1202 public boolean tryTransfer(E e) { 1203 Objects.requireNonNull(e); 1204 return xfer(e, 0L) == null; 1205 } 1206 1207 /** 1208 * Transfers the element to a consumer, waiting if necessary to do so. 1209 * 1210 * <p>More precisely, transfers the specified element immediately 1211 * if there exists a consumer already waiting to receive it (in 1212 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 1213 * else inserts the specified element at the tail of this queue 1214 * and waits until the element is received by a consumer. 1215 * 1216 * @throws NullPointerException if the specified element is null 1217 */ transfer(E e)1218 public void transfer(E e) throws InterruptedException { 1219 Objects.requireNonNull(e); 1220 if (!Thread.interrupted()) { 1221 if (xfer(e, Long.MAX_VALUE) == null) 1222 return; 1223 Thread.interrupted(); // failure possible only due to interrupt 1224 } 1225 throw new InterruptedException(); 1226 } 1227 1228 /** 1229 * Transfers the element to a consumer if it is possible to do so 1230 * before the timeout elapses. 1231 * 1232 * <p>More precisely, transfers the specified element immediately 1233 * if there exists a consumer already waiting to receive it (in 1234 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 1235 * else inserts the specified element at the tail of this queue 1236 * and waits until the element is received by a consumer, 1237 * returning {@code false} if the specified wait time elapses 1238 * before the element can be transferred. 1239 * 1240 * @throws NullPointerException if the specified element is null 1241 */ tryTransfer(E e, long timeout, TimeUnit unit)1242 public boolean tryTransfer(E e, long timeout, TimeUnit unit) 1243 throws InterruptedException { 1244 Objects.requireNonNull(e); 1245 long nanos = Math.max(unit.toNanos(timeout), 0L); 1246 if (xfer(e, nanos) == null) 1247 return true; 1248 if (!Thread.interrupted()) 1249 return false; 1250 throw new InterruptedException(); 1251 } 1252 1253 @SuppressWarnings("unchecked") take()1254 public E take() throws InterruptedException { 1255 Object e; 1256 if (!Thread.interrupted()) { 1257 if ((e = xfer(null, Long.MAX_VALUE)) != null) 1258 return (E) e; 1259 Thread.interrupted(); 1260 } 1261 throw new InterruptedException(); 1262 } 1263 1264 @SuppressWarnings("unchecked") poll(long timeout, TimeUnit unit)1265 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 1266 Object e; 1267 long nanos = Math.max(unit.toNanos(timeout), 0L); 1268 if ((e = xfer(null, nanos)) != null || !Thread.interrupted()) 1269 return (E) e; 1270 throw new InterruptedException(); 1271 } 1272 1273 @SuppressWarnings("unchecked") poll()1274 public E poll() { 1275 return (E) xfer(null, 0L); 1276 } 1277 1278 /** 1279 * @throws NullPointerException {@inheritDoc} 1280 * @throws IllegalArgumentException {@inheritDoc} 1281 */ drainTo(Collection<? super E> c)1282 public int drainTo(Collection<? super E> c) { 1283 Objects.requireNonNull(c); 1284 if (c == this) 1285 throw new IllegalArgumentException(); 1286 int n = 0; 1287 for (E e; (e = poll()) != null; n++) 1288 c.add(e); 1289 return n; 1290 } 1291 1292 /** 1293 * @throws NullPointerException {@inheritDoc} 1294 * @throws IllegalArgumentException {@inheritDoc} 1295 */ drainTo(Collection<? super E> c, int maxElements)1296 public int drainTo(Collection<? super E> c, int maxElements) { 1297 Objects.requireNonNull(c); 1298 if (c == this) 1299 throw new IllegalArgumentException(); 1300 int n = 0; 1301 for (E e; n < maxElements && (e = poll()) != null; n++) 1302 c.add(e); 1303 return n; 1304 } 1305 1306 /** 1307 * Returns an iterator over the elements in this queue in proper sequence. 1308 * The elements will be returned in order from first (head) to last (tail). 1309 * 1310 * <p>The returned iterator is 1311 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 1312 * 1313 * @return an iterator over the elements in this queue in proper sequence 1314 */ iterator()1315 public Iterator<E> iterator() { 1316 return new Itr(); 1317 } 1318 peek()1319 public E peek() { 1320 restartFromHead: for (;;) { 1321 for (DualNode p = head; p != null;) { 1322 Object item = p.item; 1323 if (p.isData) { 1324 if (item != null) { 1325 @SuppressWarnings("unchecked") E e = (E) item; 1326 return e; 1327 } 1328 } 1329 else if (item == null) 1330 break; 1331 if (p == (p = p.next)) 1332 continue restartFromHead; 1333 } 1334 return null; 1335 } 1336 } 1337 1338 /** 1339 * Returns {@code true} if this queue contains no elements. 1340 * 1341 * @return {@code true} if this queue contains no elements 1342 */ isEmpty()1343 public boolean isEmpty() { 1344 return firstDataNode() == null; 1345 } 1346 hasWaitingConsumer()1347 public boolean hasWaitingConsumer() { 1348 restartFromHead: for (;;) { 1349 for (DualNode p = head; p != null;) { 1350 Object item = p.item; 1351 if (p.isData) { 1352 if (item != null) 1353 break; 1354 } 1355 else if (item == null) 1356 return true; 1357 if (p == (p = p.next)) 1358 continue restartFromHead; 1359 } 1360 return false; 1361 } 1362 } 1363 1364 /** 1365 * Returns the number of elements in this queue. If this queue 1366 * contains more than {@code Integer.MAX_VALUE} elements, returns 1367 * {@code Integer.MAX_VALUE}. 1368 * 1369 * <p>Beware that, unlike in most collections, this method is 1370 * <em>NOT</em> a constant-time operation. Because of the 1371 * asynchronous nature of these queues, determining the current 1372 * number of elements requires an O(n) traversal. 1373 * 1374 * @return the number of elements in this queue 1375 */ size()1376 public int size() { 1377 return countOfMode(true); 1378 } 1379 getWaitingConsumerCount()1380 public int getWaitingConsumerCount() { 1381 return countOfMode(false); 1382 } 1383 1384 /** 1385 * Removes a single instance of the specified element from this queue, 1386 * if it is present. More formally, removes an element {@code e} such 1387 * that {@code o.equals(e)}, if this queue contains one or more such 1388 * elements. 1389 * Returns {@code true} if this queue contained the specified element 1390 * (or equivalently, if this queue changed as a result of the call). 1391 * 1392 * @param o element to be removed from this queue, if present 1393 * @return {@code true} if this queue changed as a result of the call 1394 */ remove(Object o)1395 public boolean remove(Object o) { 1396 if (o == null) return false; 1397 restartFromHead: for (;;) { 1398 for (DualNode p = head, pred = null; p != null; ) { 1399 boolean isData = p.isData; 1400 Object item = p.item; 1401 DualNode q = p.next; 1402 if (item != null) { 1403 if (isData) { 1404 if (o.equals(item) && tryMatchData(p, item)) { 1405 skipDeadNodes(pred, p, p, q); 1406 return true; 1407 } 1408 pred = p; p = q; continue; 1409 } 1410 } 1411 else if (!isData) 1412 break; 1413 for (DualNode c = p;; q = p.next) { 1414 if (q == null || !q.matched()) { 1415 pred = skipDeadNodes(pred, c, p, q); p = q; break; 1416 } 1417 if (p == (p = q)) continue restartFromHead; 1418 } 1419 } 1420 return false; 1421 } 1422 } 1423 1424 /** 1425 * Returns {@code true} if this queue contains the specified element. 1426 * More formally, returns {@code true} if and only if this queue contains 1427 * at least one element {@code e} such that {@code o.equals(e)}. 1428 * 1429 * @param o object to be checked for containment in this queue 1430 * @return {@code true} if this queue contains the specified element 1431 */ contains(Object o)1432 public boolean contains(Object o) { 1433 if (o == null) return false; 1434 restartFromHead: for (;;) { 1435 for (DualNode p = head, pred = null; p != null; ) { 1436 boolean isData = p.isData; 1437 Object item = p.item; 1438 DualNode q = p.next; 1439 if (item != null) { 1440 if (isData) { 1441 if (o.equals(item)) 1442 return true; 1443 pred = p; p = q; continue; 1444 } 1445 } 1446 else if (!isData) 1447 break; 1448 for (DualNode c = p;; q = p.next) { 1449 if (q == null || !q.matched()) { 1450 pred = skipDeadNodes(pred, c, p, q); p = q; break; 1451 } 1452 if (p == (p = q)) continue restartFromHead; 1453 } 1454 } 1455 return false; 1456 } 1457 } 1458 1459 /** 1460 * Always returns {@code Integer.MAX_VALUE} because a 1461 * {@code LinkedTransferQueue} is not capacity constrained. 1462 * 1463 * @return {@code Integer.MAX_VALUE} (as specified by 1464 * {@link BlockingQueue#remainingCapacity()}) 1465 */ remainingCapacity()1466 public int remainingCapacity() { 1467 return Integer.MAX_VALUE; 1468 } 1469 1470 /** 1471 * Saves this queue to a stream (that is, serializes it). 1472 * 1473 * @param s the stream 1474 * @throws java.io.IOException if an I/O error occurs 1475 * @serialData All of the elements (each an {@code E}) in 1476 * the proper order, followed by a null 1477 */ writeObject(java.io.ObjectOutputStream s)1478 private void writeObject(java.io.ObjectOutputStream s) 1479 throws java.io.IOException { 1480 s.defaultWriteObject(); 1481 for (E e : this) 1482 s.writeObject(e); 1483 // Use trailing null as sentinel 1484 s.writeObject(null); 1485 } 1486 1487 /** 1488 * Reconstitutes this queue from a stream (that is, deserializes it). 1489 * @param s the stream 1490 * @throws ClassNotFoundException if the class of a serialized object 1491 * could not be found 1492 * @throws java.io.IOException if an I/O error occurs 1493 */ readObject(java.io.ObjectInputStream s)1494 private void readObject(java.io.ObjectInputStream s) 1495 throws java.io.IOException, ClassNotFoundException { 1496 1497 // Read in elements until trailing null sentinel found 1498 DualNode h = null, t = null; 1499 for (Object item; (item = s.readObject()) != null; ) { 1500 DualNode newNode = new DualNode(item, true); 1501 if (t == null) 1502 h = newNode; 1503 else 1504 t.next = newNode; 1505 t = newNode; 1506 } 1507 head = h; 1508 tail = t; 1509 } 1510 1511 /** 1512 * @throws NullPointerException {@inheritDoc} 1513 */ removeIf(Predicate<? super E> filter)1514 public boolean removeIf(Predicate<? super E> filter) { 1515 Objects.requireNonNull(filter); 1516 return bulkRemove(filter); 1517 } 1518 1519 /** 1520 * @throws NullPointerException {@inheritDoc} 1521 */ removeAll(Collection<?> c)1522 public boolean removeAll(Collection<?> c) { 1523 Objects.requireNonNull(c); 1524 return bulkRemove(e -> c.contains(e)); 1525 } 1526 1527 /** 1528 * @throws NullPointerException {@inheritDoc} 1529 */ retainAll(Collection<?> c)1530 public boolean retainAll(Collection<?> c) { 1531 Objects.requireNonNull(c); 1532 return bulkRemove(e -> !c.contains(e)); 1533 } 1534 clear()1535 public void clear() { 1536 bulkRemove(e -> true); 1537 } 1538 1539 /** 1540 * Tolerate this many consecutive dead nodes before CAS-collapsing. 1541 * Amortized cost of clear() is (1 + 1/MAX_HOPS) CASes per element. 1542 */ 1543 private static final int MAX_HOPS = 8; 1544 1545 1546 /** Implementation of bulk remove methods. */ 1547 @SuppressWarnings("unchecked") bulkRemove(Predicate<? super E> filter)1548 private boolean bulkRemove(Predicate<? super E> filter) { 1549 boolean removed = false; 1550 restartFromHead: for (;;) { 1551 int hops = MAX_HOPS; 1552 // c will be CASed to collapse intervening dead nodes between 1553 // pred (or head if null) and p. 1554 for (DualNode p = head, c = p, pred = null, q; p != null; p = q) { 1555 boolean isData = p.isData, pAlive; 1556 Object item = p.item; 1557 q = p.next; 1558 if (pAlive = (item != null && isData)) { 1559 if (filter.test((E) item)) { 1560 if (tryMatchData(p, item)) 1561 removed = true; 1562 pAlive = false; 1563 } 1564 } 1565 else if (!isData && item == null) 1566 break; 1567 if (pAlive || q == null || --hops == 0) { 1568 // p might already be self-linked here, but if so: 1569 // - CASing head will surely fail 1570 // - CASing pred's next will be useless but harmless. 1571 if ((c != p && !tryCasSuccessor(pred, c, c = p)) || pAlive) { 1572 // if CAS failed or alive, abandon old pred 1573 hops = MAX_HOPS; 1574 pred = p; 1575 c = q; 1576 } 1577 } else if (p == q) 1578 continue restartFromHead; 1579 } 1580 return removed; 1581 } 1582 } 1583 1584 /** 1585 * Runs action on each element found during a traversal starting at p. 1586 * If p is null, the action is not run. 1587 */ 1588 @SuppressWarnings("unchecked") forEachFrom(Consumer<? super E> action, DualNode p)1589 void forEachFrom(Consumer<? super E> action, DualNode p) { 1590 for (DualNode pred = null; p != null; ) { 1591 boolean isData = p.isData; 1592 Object item = p.item; 1593 DualNode q = p.next; 1594 if (item != null) { 1595 if (isData) { 1596 action.accept((E) item); 1597 pred = p; p = q; continue; 1598 } 1599 } 1600 else if (!isData) 1601 break; 1602 for (DualNode c = p;; q = p.next) { 1603 if (q == null || !q.matched()) { 1604 pred = skipDeadNodes(pred, c, p, q); p = q; break; 1605 } 1606 if (p == (p = q)) { pred = null; p = head; break; } 1607 } 1608 } 1609 } 1610 1611 /** 1612 * @throws NullPointerException {@inheritDoc} 1613 */ forEach(Consumer<? super E> action)1614 public void forEach(Consumer<? super E> action) { 1615 Objects.requireNonNull(action); 1616 forEachFrom(action, head); 1617 } 1618 1619 // VarHandle mechanics 1620 static final VarHandle HEAD; 1621 static final VarHandle TAIL; 1622 static final VarHandle SWEEPVOTES; 1623 static { 1624 try { 1625 Class<?> ltq = LinkedTransferQueue.class, tn = DualNode.class; 1626 MethodHandles.Lookup l = MethodHandles.lookup(); 1627 HEAD = l.findVarHandle(ltq, "head", tn); 1628 TAIL = l.findVarHandle(ltq, "tail", tn); 1629 SWEEPVOTES = l.findVarHandle(ltq, "sweepVotes", int.class); 1630 } catch (ReflectiveOperationException e) { 1631 throw new ExceptionInInitializerError(e); 1632 } 1633 } 1634 } 1635