• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.lang.invoke.MethodHandles;
39 import java.lang.invoke.VarHandle;
40 import java.util.AbstractQueue;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.Iterator;
44 import java.util.NoSuchElementException;
45 import java.util.Objects;
46 import java.util.Queue;
47 import java.util.Spliterator;
48 import java.util.Spliterators;
49 import java.util.concurrent.locks.LockSupport;
50 import java.util.concurrent.ForkJoinWorkerThread;
51 import java.util.function.Consumer;
52 import java.util.function.Predicate;
53 
54 /**
55  * An unbounded {@link TransferQueue} based on linked nodes.
56  * This queue orders elements FIFO (first-in-first-out) with respect
57  * to any given producer.  The <em>head</em> of the queue is that
58  * element that has been on the queue the longest time for some
59  * producer.  The <em>tail</em> of the queue is that element that has
60  * been on the queue the shortest time for some producer.
61  *
62  * <p>Beware that, unlike in most collections, the {@code size} method
63  * is <em>NOT</em> a constant-time operation. Because of the
64  * asynchronous nature of these queues, determining the current number
65  * of elements requires a traversal of the elements, and so may report
66  * inaccurate results if this collection is modified during traversal.
67  *
68  * <p>Bulk operations that add, remove, or examine multiple elements,
69  * such as {@link #addAll}, {@link #removeIf} or {@link #forEach},
70  * are <em>not</em> guaranteed to be performed atomically.
71  * For example, a {@code forEach} traversal concurrent with an {@code
72  * addAll} operation might observe only some of the added elements.
73  *
74  * <p>This class and its iterator implement all of the <em>optional</em>
75  * methods of the {@link Collection} and {@link Iterator} interfaces.
76  *
77  * <p>Memory consistency effects: As with other concurrent
78  * collections, actions in a thread prior to placing an object into a
79  * {@code LinkedTransferQueue}
80  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
81  * actions subsequent to the access or removal of that element from
82  * the {@code LinkedTransferQueue} in another thread.
83  *
84  * <p>This class is a member of the
85  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
86  * Java Collections Framework</a>.
87  *
88  * @since 1.7
89  * @author Doug Lea
90  * @param <E> the type of elements held in this queue
91  */
92 public class LinkedTransferQueue<E> extends AbstractQueue<E>
93     implements TransferQueue<E>, java.io.Serializable {
94     private static final long serialVersionUID = -3223113410248163686L;
95 
96     /*
97      * *** Overview of Dual Queues with Slack ***
98      *
99      * Dual Queues, introduced by Scherer and Scott
100      * (http://www.cs.rochester.edu/~scott/papers/2004_DISC_dual_DS.pdf)
101      * are (linked) queues in which nodes may represent either data or
102      * requests.  When a thread tries to enqueue a data node, but
103      * encounters a request node, it instead "matches" and removes it;
104      * and vice versa for enqueuing requests. Blocking Dual Queues
105      * arrange that threads enqueuing unmatched requests block until
106      * other threads provide the match. Dual Synchronous Queues (see
107      * Scherer, Lea, & Scott
108      * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)
109      * additionally arrange that threads enqueuing unmatched data also
110      * block.  Dual Transfer Queues support all of these modes, as
111      * dictated by callers. All enqueue/dequeue operations can be
112      * handled by a single method (here, "xfer") with parameters
113      * indicating whether to act as some form of offer, put, poll,
114      * take, or transfer (each possibly with timeout), as described
115      * below.
116      *
117      * A FIFO dual queue may be implemented using a variation of the
118      * Michael & Scott (M&S) lock-free queue algorithm
119      * (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf).
120      * It maintains two pointer fields, "head", pointing to a
121      * (matched) node that in turn points to the first actual
122      * (unmatched) queue node (or null if empty); and "tail" that
123      * points to the last node on the queue (or again null if
124      * empty). For example, here is a possible queue with four data
125      * elements:
126      *
127      *  head                tail
128      *    |                   |
129      *    v                   v
130      *    M -> U -> U -> U -> U
131      *
132      * The M&S queue algorithm is known to be prone to scalability and
133      * overhead limitations when maintaining (via CAS) these head and
134      * tail pointers. To address these, dual queues with slack differ
135      * from plain M&S dual queues by virtue of only sometimes updating
136      * head or tail pointers when matching, appending, or even
137      * traversing nodes.
138      *
139      * In a dual queue, each node must atomically maintain its match
140      * status. Matching entails CASing an "item" field from a non-null
141      * data value to null upon match, and vice-versa for request
142      * nodes, CASing from null to a data value.  (To reduce the need
143      * for re-reads, we use the compareAndExchange forms of CAS for
144      * pointer updates, that provide the current value to continue
145      * with on failure.)  Note that the linearization properties of
146      * this style of queue are easy to verify -- elements are made
147      * available by linking, and unavailable by matching. Compared to
148      * plain M&S queues, this property of dual queues requires one
149      * additional successful atomic operation per enq/deq pair. But it
150      * also enables lower cost variants of queue maintenance
151      * mechanics.
152      *
153      * Once a node is matched, it is no longer live -- its match
154      * status can never again change.  We may thus arrange that the
155      * linked list of them contain a prefix of zero or more dead
156      * nodes, followed by a suffix of zero or more live nodes. Note
157      * that we allow both the prefix and suffix to be zero length,
158      * which in turn means that we do not require a dummy header.
159      *
160      * We use here an approach that lies between the extremes of
161      * never versus always updating queue (head and tail) pointers.
162      * This offers a tradeoff between sometimes requiring extra
163      * traversal steps to locate the first and/or last unmatched
164      * nodes, versus the reduced overhead and contention of fewer
165      * updates to queue pointers. For example, a possible snapshot of
166      * a queue is:
167      *
168      *  head           tail
169      *    |              |
170      *    v              v
171      *    M -> M -> U -> U -> U -> U
172      *
173      * The best value for this "slack" (the targeted maximum distance
174      * between the value of "head" and the first unmatched node, and
175      * similarly for "tail") is an empirical matter. Larger values
176      * introduce increasing costs of cache misses and risks of long
177      * traversal chains and out-of-order updates, while smaller values
178      * increase CAS contention and overhead. Using the smallest
179      * non-zero value of one is both simple and empirically a good
180      * choice in most applicatkions.  The slack value is hard-wired: a
181      * path greater than one is usually implemented by checking
182      * equality of traversal pointers.  Because CASes updating fields
183      * attempting to do so may stall, the writes may appear out of
184      * order (an older CAS from the same head or tail may execute
185      * after a newer one), the actual slack may exceed targeted
186      * slack. To reduce impact, other threads may help update by
187      * unsplicing dead nodes while traversing.
188      *
189      * These ideas must be further extended to avoid unbounded amounts
190      * of costly-to-reclaim garbage caused by the sequential "next"
191      * links of nodes starting at old forgotten head nodes: As first
192      * described in detail by Boehm
193      * (http://portal.acm.org/citation.cfm?doid=503272.503282), if a
194      * GC delays noticing that any arbitrarily old node has become
195      * garbage, all newer dead nodes will also be unreclaimed.
196      * (Similar issues arise in non-GC environments.)  To cope with
197      * this in our implementation, upon advancing the head pointer, we
198      * set the "next" link of the previous head to point only to
199      * itself; thus limiting the length of chains of dead nodes.  (We
200      * also take similar care to wipe out possibly garbage retaining
201      * values held in other node fields.) This is easy to accommodate
202      * in the primary xfer method, but adds a lot of complexity to
203      * Collection operations including traversal; mainly because if
204      * any "next" pointer links to itself, the current thread has
205      * lagged behind a head-update, and so must restart.
206      *
207      * *** Blocking ***
208      *
209      * The DualNode class is shared with class SynchronousQueue. It
210      * houses method await, which is used for all blocking control, as
211      * described below in DualNode internal documentation.
212      *
213      * ** Unlinking removed interior nodes **
214      *
215      * In addition to minimizing garbage retention via self-linking
216      * described above, we also unlink removed interior nodes. These
217      * may arise due to timed out or interrupted waits, or calls to
218      * remove(x) or Iterator.remove.  Normally, given a node that was
219      * at one time known to be the predecessor of some node s that is
220      * to be removed, we can unsplice s by CASing the next field of
221      * its predecessor if it still points to s (otherwise s must
222      * already have been removed or is now offlist). But there are two
223      * situations in which we cannot guarantee to make node s
224      * unreachable in this way: (1) If s is the trailing node of list
225      * (i.e., with null next), then it is pinned as the target node
226      * for appends, so can only be removed later after other nodes are
227      * appended. (2) Unless we know it is already off-list, we cannot
228      * necessarily unlink s given a predecessor node that is matched
229      * (including the case of being cancelled): the predecessor may
230      * already be unspliced, in which case some previous reachable
231      * node may still point to s.  (For further explanation see
232      * Herlihy & Shavit "The Art of Multiprocessor Programming"
233      * chapter 9).
234      *
235      * Without taking these into account, it would be possible for an
236      * unbounded number of supposedly removed nodes to remain reachable.
237      * Situations leading to such buildup are uncommon but can occur
238      * in practice; for example when a series of short timed calls to
239      * poll repeatedly time out at the trailing node but otherwise
240      * never fall off the list because of an untimed call to take() at
241      * the front of the queue.
242      *
243      * When these cases arise, rather than always retraversing the
244      * entire list to find an actual predecessor to unlink (which
245      * won't help for case (1) anyway), we record a conservative
246      * estimate of possible unsplice failures (in "sweepVotes").
247      * We trigger a full sweep when the estimate exceeds a threshold
248      * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
249      * removal failures to tolerate before sweeping through, unlinking
250      * cancelled nodes that were not unlinked upon initial removal.
251      * We perform sweeps by the thread hitting threshold (rather than
252      * background threads or by spreading work to other threads)
253      * because in the main contexts in which removal occurs, the
254      * caller is timed-out or cancelled, which are not time-critical
255      * enough to warrant the overhead that alternatives would impose
256      * on other threads.
257      *
258      * Because the sweepVotes estimate is conservative, and because
259      * nodes become unlinked "naturally" as they fall off the head of
260      * the queue, and because we allow votes to accumulate even while
261      * sweeps are in progress, there are typically significantly fewer
262      * such nodes than estimated.
263      *
264      * Note that we cannot self-link unlinked interior nodes during
265      * sweeps. However, the associated garbage chains terminate when
266      * some successor ultimately falls off the head of the list and is
267      * self-linked.
268      *
269      * *** Revision notes ***
270      *
271      * This version differs from previous releases as follows:
272      *
273      * * Class DualNode replaces Qnode, with fields and methods
274      *   that apply to any match-based dual data structure, and now
275      *   usable in other j.u.c classes. in particular, SynchronousQueue.
276      * * Blocking control (in class DualNode) accommodates
277      *   VirtualThreads and (perhaps virtualized) uniprocessors.
278      * * All fields of this class (LinkedTransferQueue) are
279      *   default-initializable (to null), allowing further extension
280      *   (in particular, SynchronousQueue.Transferer)
281      * * Head and tail fields are lazily initialized rather than set
282      *   to a dummy node, while also reducing retries under heavy
283      *   contention and misorderings, and relaxing some accesses,
284      *   requiring accommodation in many places (as well as
285      *   adjustments in WhiteBox tests).
286      */
287 
288     /**
289      * Node for linked dual data structures. Uses type Object, not E,
290      * for items to allow cancellation and forgetting after use. Only
291      * field "item" is declared volatile (with bypasses for
292      * pre-publication and post-match writes), although field "next"
293      * is also CAS-able. Other accesses are constrained by context
294      * (including dependent chains of next's headed by a volatile
295      * read).
296      *
297      * This class also arranges blocking while awaiting matches.
298      * Control of blocking (and thread scheduling in general) for
299      * possibly-synchronous queues (and channels etc constructed
300      * from them) must straddle two extremes: If there are too few
301      * underlying cores for a fulfilling party to continue, then
302      * the caller must park to cause a context switch. On the
303      * other hand, if the queue is busy with approximately the
304      * same number of independent producers and consumers, then
305      * that context switch may cause an order-of-magnitude
306      * slowdown. Many cases are somewhere in-between, in which
307      * case threads should try spinning and then give up and
308      * block. We deal with this as follows:
309      *
310      * 1. Callers to method await indicate eligibility for
311      * spinning when the node is either the only waiting node, or
312      * the next matchable node is still spinning.  Otherwise, the
313      * caller may block (almost) immediately.
314      *
315      * 2. Even if eligible to spin, a caller blocks anyway in two
316      * cases where it is normally best: If the thread isVirtual,
317      * or the system is a uniprocessor. Uniprocessor status can
318      * vary over time (due to virtualization at other system
319      * levels), but checking Runtime availableProcessors can be
320      * slow and may itself acquire blocking locks, so we only
321      * occasionally (using ThreadLocalRandom) update when an
322      * otherwise-eligible spin elapses.
323      *
324      * 3. When enabled, spins should be long enough to cover
325      * bookeeping overhead of almost-immediate fulfillments, but
326      * much less than the expected time of a (non-virtual)
327      * park/unpark context switch.  The optimal value is
328      * unknowable, in part because the relative costs of
329      * Thread.onSpinWait versus park/unpark vary across platforms.
330      * The current value is an empirical compromise across tested
331      * platforms.
332      *
333      * 4. When using timed waits, callers spin instead of invoking
334      * timed park if the remaining time is less than the likely cost
335      * of park/unpark. This also avoids re-parks when timed park
336      * returns just barely too soon. As is the case in most j.u.c
337      * blocking support, untimed waits use ManagedBlockers when
338      * callers are ForkJoin threads, but timed waits use plain
339      * parkNanos, under the rationale that known-to-be transient
340      * blocking doesn't require compensation. (This decision should be
341      * revisited here and elsewhere to deal with very long timeouts.)
342      *
343      * 5. Park/unpark signalling otherwise relies on a Dekker-like
344      * scheme in which the caller advertises the need to unpark by
345      * setting its waiter field, followed by a full fence and recheck
346      * before actually parking. An explicit fence in used here rather
347      * than unnecessarily requiring volatile accesses elsewhere. This
348      * fence also separates accesses to field isUniprocessor.
349      *
350      * 6. To make the above work, callers must precheck that
351      * timeouts are not already elapsed, and that interruptible
352      * operations were not already interrupted on call to the
353      * corresponding queue operation.  Cancellation on timeout or
354      * interrupt otherwise proceeds by trying to fulfill with an
355      * impossible value (which is one reason that we use Object
356      * types here rather than typed fields).
357      */
358     static final class DualNode implements ForkJoinPool.ManagedBlocker {
359         volatile Object item;   // initially non-null if isData; CASed to match
360         DualNode next;          // accessed only in chains of volatile ops
361         Thread waiter;          // access order constrained by context
362         final boolean isData;   // false if this is a request node
363 
DualNode(Object item, boolean isData)364         DualNode(Object item, boolean isData) {
365             ITEM.set(this, item); // relaxed write before publication
366             this.isData = isData;
367         }
368 
369         // Atomic updates
cmpExItem(Object cmp, Object val)370         final Object cmpExItem(Object cmp, Object val) { // try to match
371             return ITEM.compareAndExchange(this, cmp, val);
372         }
cmpExNext(DualNode cmp, DualNode val)373         final DualNode cmpExNext(DualNode cmp, DualNode val) {
374             return (DualNode)NEXT.compareAndExchange(this, cmp, val);
375         }
376 
377         /** Returns true if this node has been matched or cancelled  */
matched()378         final boolean matched() {
379             return isData != (item != null);
380         }
381 
382         /**
383          * Relaxed write to replace reference to user data with
384          * self-link. Can be used only if not already null after
385          * match.
386          */
selfLinkItem()387         final void selfLinkItem() {
388             ITEM.set(this, this);
389         }
390 
391         /** The number of times to spin when eligible */
392         private static final int SPINS = 1 << 7;
393 
394         /**
395          * The number of nanoseconds for which it is faster to spin
396          * rather than to use timed park. A rough estimate suffices.
397          */
398         private static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1L << 10;
399 
400         /**
401          * True if system is a uniprocessor, occasionally rechecked.
402          */
403         private static boolean isUniprocessor =
404             (Runtime.getRuntime().availableProcessors() == 1);
405 
406         /**
407          * Refresh rate (probablility) for updating isUniprocessor
408          * field, to reduce the likeihood that multiple calls to await
409          * will contend invoking Runtime.availableProcessors.  Must be
410          * a power of two minus one.
411          */
412         private static final int UNIPROCESSOR_REFRESH_RATE = (1 << 5) - 1;
413 
414         /**
415          * Possibly blocks until matched or caller gives up.
416          *
417          * @param e the comparison value for checking match
418          * @param ns timeout, or Long.MAX_VALUE if untimed
419          * @param blocker the LockSupport.setCurrentBlocker argument
420          * @param spin true if should spin when enabled
421          * @return matched item, or e if unmatched on interrupt or timeout
422          */
await(Object e, long ns, Object blocker, boolean spin)423         final Object await(Object e, long ns, Object blocker, boolean spin) {
424             Object m;                      // the match or e if none
425             boolean timed = (ns != Long.MAX_VALUE);
426             long deadline = (timed) ? System.nanoTime() + ns : 0L;
427             boolean upc = isUniprocessor;  // don't spin but later recheck
428             Thread w = Thread.currentThread();
429             if (w.isVirtual())             // don't spin
430                 spin = false;
431             int spins = (spin & !upc) ? SPINS : 0; // negative when may park
432             while ((m = item) == e) {
433                 if (spins >= 0) {
434                     if (--spins >= 0)
435                         Thread.onSpinWait();
436                     else {                 // prepare to park
437                         if (spin)          // occasionally recheck
438                             checkForUniprocessor(upc);
439                         LockSupport.setCurrentBlocker(blocker);
440                         waiter = w;        // ensure ordering
441                         VarHandle.fullFence();
442                     }
443                 } else if (w.isInterrupted() ||
444                            (timed &&       // try to cancel with impossible match
445                             ((ns = deadline - System.nanoTime()) <= 0L))) {
446                     m = cmpExItem(e, (e == null) ? this : null);
447                     break;
448                 } else if (timed) {
449                     if (ns < SPIN_FOR_TIMEOUT_THRESHOLD)
450                         Thread.onSpinWait();
451                     else
452                         LockSupport.parkNanos(ns);
453                 } else if (w instanceof ForkJoinWorkerThread) {
454                     try {
455                         ForkJoinPool.managedBlock(this);
456                     } catch (InterruptedException cannotHappen) { }
457                 } else
458                     LockSupport.park();
459             }
460             if (spins < 0) {
461                 LockSupport.setCurrentBlocker(null);
462                 waiter = null;
463             }
464             return m;
465         }
466 
467         /** Occasionally updates isUniprocessor field */
checkForUniprocessor(boolean prev)468         private void checkForUniprocessor(boolean prev) {
469             int r = ThreadLocalRandom.nextSecondarySeed();
470             if ((r & UNIPROCESSOR_REFRESH_RATE) == 0) {
471                 boolean u = (Runtime.getRuntime().availableProcessors() == 1);
472                 if (u != prev)
473                     isUniprocessor = u;
474             }
475         }
476 
477         // ManagedBlocker support
isReleasable()478         public final boolean isReleasable() {
479             return (matched() || Thread.currentThread().isInterrupted());
480         }
block()481         public final boolean block() {
482             while (!isReleasable()) LockSupport.park();
483             return true;
484         }
485 
486         // VarHandle mechanics
487         static final VarHandle ITEM;
488         static final VarHandle NEXT;
489         static {
490             try {
491                 Class<?> tn = DualNode.class;
492                 MethodHandles.Lookup l = MethodHandles.lookup();
493                 ITEM = l.findVarHandle(tn, "item", Object.class);
494                 NEXT = l.findVarHandle(tn, "next", tn);
495             } catch (ReflectiveOperationException e) {
496                 throw new ExceptionInInitializerError(e);
497             }
498             // Reduce the risk of rare disastrous classloading in first call to
499             // LockSupport.park: https://bugs.openjdk.org/browse/JDK-8074773
500             Class<?> ensureLoaded = LockSupport.class;
501         }
502     }
503 
504     /**
505      * Unless empty (in which case possibly null), a node from which
506      * all live nodes are reachable.
507      * Invariants:
508      * - head is never self-linked
509      * Non-invariants:
510      * - head may or may not be live
511      *
512      * This field is used by subclass SynchronousQueue.Transferer to
513      * record the top of a Lifo stack, with tail always null, but
514      * otherwise maintaining the same properties.
515      */
516     transient volatile DualNode head;
517 
518     /**
519      * Unless null, a node from which the last node on list (that is,
520      * the unique node with node.next == null), if one exists, can be
521      * reached.
522      * Non-invariants:
523      * - tail may or may not be live
524      * - tail may be the same as head
525      * - tail may or may not be self-linked.
526      * - tail may lag behind head, so need not be reachable from head
527      */
528     transient volatile DualNode tail;
529 
530     /** The number of apparent failures to unsplice cancelled nodes */
531     transient volatile int sweepVotes;
532 
533     // Atomic updates
534 
cmpExTail(DualNode cmp, DualNode val)535     final DualNode cmpExTail(DualNode cmp, DualNode val) {
536         return (DualNode)TAIL.compareAndExchange(this, cmp, val);
537     }
cmpExHead(DualNode cmp, DualNode val)538     final DualNode cmpExHead(DualNode cmp, DualNode val) {
539         return (DualNode)HEAD.compareAndExchange(this, cmp, val);
540     }
541 
542     /**
543      * The maximum number of estimated removal failures (sweepVotes)
544      * to tolerate before sweeping through the queue unlinking
545      * dead nodes that were initially pinned.  Must be a power of
546      * two minus one, at least 3.
547      */
548     static final int SWEEP_THRESHOLD = (1 << 4) - 1;
549 
550     /**
551      * Adds a sweepVote and returns true if triggered threshold.
552      */
sweepNow()553     final boolean sweepNow() {
554         return (SWEEP_THRESHOLD ==
555                 ((int)SWEEPVOTES.getAndAdd(this, 1) & (SWEEP_THRESHOLD)));
556     }
557 
558     /**
559      * Implements all queuing methods. Loops, trying:
560      *
561      * * If not initialized, try to add new node (unless immediate) and exit
562      * * If tail has same mode, start traversing at tail for a likely
563      *   append, else at head for a likely match
564      * * Traverse over dead or wrong-mode nodes until finding a spot
565      *   to match/append, or falling off the list because of self-links.
566      * * On success, update head or tail if slacked, and possibly wait,
567      *   depending on ns argument
568      *
569      * @param e the item or null for take
570      * @param ns timeout or negative if async, 0 if immediate,
571      *        Long.MAX_VALUE if untimed
572      * @return an item if matched, else e
573      */
xfer(Object e, long ns)574     final Object xfer(Object e, long ns) {
575         boolean haveData = (e != null);
576         Object m;                           // the match or e if none
577         DualNode s = null, p;               // enqueued node and its predecessor
578         restart: for (DualNode prevp = null;;) {
579             DualNode h, t, q;
580             if ((h = head) == null &&       // initialize unless immediate
581                 (ns == 0L ||
582                  (h = cmpExHead(null, s = new DualNode(e, haveData))) == null)) {
583                 p = null;                   // no predecessor
584                 break;                      // else lost init race
585             }
586             p = (t = tail) != null && t.isData == haveData && t != prevp ? t : h;
587             prevp = p;                      // avoid known self-linked tail path
588             do {
589                 m = p.item;
590                 q = p.next;
591                 if (p.isData != haveData && haveData != (m != null) &&
592                     p.cmpExItem(m, e) == m) {
593                     Thread w = p.waiter;    // matched complementary node
594                     if (p != h && h == cmpExHead(h, (q == null) ? p : q))
595                         h.next = h;         // advance head; self-link old
596                     LockSupport.unpark(w);
597                     return m;
598                 } else if (q == null) {
599                     if (ns == 0L)           // try to append unless immediate
600                         break restart;
601                     if (s == null)
602                         s = new DualNode(e, haveData);
603                     if ((q = p.cmpExNext(null, s)) == null) {
604                         if (p != t)
605                             cmpExTail(t, s);
606                         break restart;
607                     }
608                 }
609             } while (p != (p = q));         // restart if self-linked
610         }
611         if (s == null || ns <= 0L)
612             m = e;                          // don't wait
613         else if ((m = s.await(e, ns, this,  // spin if at or near head
614                               p == null || p.waiter == null)) == e)
615             unsplice(p, s);                 // cancelled
616         else if (m != null)
617             s.selfLinkItem();
618 
619         return m;
620     }
621 
622     /* --------------  Removals -------------- */
623 
624     /**
625      * Unlinks (now or later) the given (non-live) node with given
626      * predecessor. See above for rationale.
627      *
628      * @param pred if nonnull, a node that was at one time known to be the
629      * predecessor of s (else s may have been head)
630      * @param s the node to be unspliced
631      */
unsplice(DualNode pred, DualNode s)632     private void unsplice(DualNode pred, DualNode s) {
633         boolean seen = false; // try removing by collapsing head
634         for (DualNode h = head, p = h, f; p != null;) {
635             boolean matched;
636             if (p == s)
637                 matched = seen = true;
638             else
639                 matched = p.matched();
640             if ((f = p.next) == p)
641                 p = h = head;
642             else if (f != null && matched)
643                 p = f;
644             else {
645                 if (p != h && cmpExHead(h, p) == h)
646                     h.next = h; // self-link
647                 break;
648             }
649         }
650         DualNode sn;      // try to unsplice if not pinned
651         if (!seen &&
652             pred != null && pred.next == s && s != null && (sn = s.next) != s &&
653             (sn == null || pred.cmpExNext(s, sn) != s || pred.matched()) &&
654             sweepNow()) { // occasionally sweep if might not have been removed
655             for (DualNode p = head, f, n, u;
656                  p != null && (f = p.next) != null && (n = f.next) != null;) {
657                 p = (f == p                       ? head :  // stale
658                      !f.matched()                 ? f :     // skip
659                      f == (u = p.cmpExNext(f, n)) ? n : u); // unspliced
660             }
661         }
662     }
663 
664     /**
665      * Tries to CAS pred.next (or head, if pred is null) from c to p.
666      * Caller must ensure that we're not unlinking the trailing node.
667      */
tryCasSuccessor(DualNode pred, DualNode c, DualNode p)668     final boolean tryCasSuccessor(DualNode pred, DualNode c, DualNode p) {
669         // assert p != null && c.matched() && c != p;
670         if (pred != null)
671             return pred.cmpExNext(c, p) == c;
672         else if (cmpExHead(c, p) != c)
673             return false;
674         if (c != null)
675             c.next = c;
676 
677         return true;
678     }
679 
680     /**
681      * Collapses dead (matched) nodes between pred and q.
682      * @param pred the last known live node, or null if none
683      * @param c the first dead node
684      * @param p the last dead node
685      * @param q p.next: the next live node, or null if at end
686      * @return pred if pred still alive and CAS succeeded; else p
687      */
skipDeadNodes(DualNode pred, DualNode c, DualNode p, DualNode q)688     final DualNode skipDeadNodes(DualNode pred, DualNode c,
689                                  DualNode p, DualNode q) {
690         // assert pred != c && p != q; && c.matched() && p.matched();
691         if (q == null) { // Never unlink trailing node.
692             if (c == p)
693                 return pred;
694             q = p;
695         }
696         return (tryCasSuccessor(pred, c, q) && (pred == null || !pred.matched()))
697             ? pred : p;
698     }
699 
700     /**
701      * Tries to match the given object only if p is a data
702      * node. Signals waiter on success.
703      */
tryMatchData(DualNode p, Object x)704     final boolean tryMatchData(DualNode p, Object x) {
705         if (p != null && p.isData &&
706             x != null && p.cmpExItem(x, null) == x) {
707             LockSupport.unpark(p.waiter);
708             return true;
709         }
710         return false;
711     }
712 
713     /* -------------- Traversal methods -------------- */
714 
715     /**
716      * Returns the first unmatched data node, or null if none.
717      * Callers must recheck if the returned node is unmatched
718      * before using.
719      */
firstDataNode()720     final DualNode firstDataNode() {
721         for (DualNode h = head, p = h, q, u; p != null;) {
722             boolean isData = p.isData;
723             Object item = p.item;
724             if (isData && item != null)       // is live data
725                 return p;
726             else if (!isData && item == null) // is live request
727                 break;
728             else if ((q = p.next) == null)    // end of list
729                 break;
730             else if (p == q)                  // self-link; restart
731                 p = h = head;
732             else if (p == h)                  // traverse past header
733                 p = q;
734             else if ((u = cmpExHead(h, q)) != h)
735                 p = h = u;                    // lost update race
736             else {
737                 h.next = h;                   // collapse; self-link
738                 p = h = q;
739             }
740         }
741         return null;
742     }
743 
744     /**
745      * Traverses and counts unmatched nodes of the given mode.
746      * Used by methods size and getWaitingConsumerCount.
747      */
countOfMode(boolean data)748     final int countOfMode(boolean data) {
749         restartFromHead: for (;;) {
750             int count = 0;
751             for (DualNode p = head; p != null;) {
752                 if (!p.matched()) {
753                     if (p.isData != data)
754                         return 0;
755                     if (++count == Integer.MAX_VALUE)
756                         break;  // @see Collection.size()
757                 }
758                 if (p == (p = p.next))
759                     continue restartFromHead;
760             }
761             return count;
762         }
763     }
764 
toString()765     public String toString() {
766         String[] a = null;
767         restartFromHead: for (;;) {
768             int charLength = 0;
769             int size = 0;
770             for (DualNode p = head; p != null;) {
771                 Object item = p.item;
772                 if (p.isData) {
773                     if (item != null) {
774                         if (a == null)
775                             a = new String[4];
776                         else if (size == a.length)
777                             a = Arrays.copyOf(a, 2 * size);
778                         String s = item.toString();
779                         a[size++] = s;
780                         charLength += s.length();
781                     }
782                 } else if (item == null)
783                     break;
784                 if (p == (p = p.next))
785                     continue restartFromHead;
786             }
787 
788             if (size == 0)
789                 return "[]";
790 
791             return Helpers.toString(a, size, charLength);
792         }
793     }
794 
toArrayInternal(Object[] a)795     private Object[] toArrayInternal(Object[] a) {
796         Object[] x = a;
797         restartFromHead: for (;;) {
798             int size = 0;
799             for (DualNode p = head; p != null;) {
800                 Object item = p.item;
801                 if (p.isData) {
802                     if (item != null) {
803                         if (x == null)
804                             x = new Object[4];
805                         else if (size == x.length)
806                             x = Arrays.copyOf(x, 2 * (size + 4));
807                         x[size++] = item;
808                     }
809                 } else if (item == null)
810                     break;
811                 if (p == (p = p.next))
812                     continue restartFromHead;
813             }
814             if (x == null)
815                 return new Object[0];
816             else if (a != null && size <= a.length) {
817                 if (a != x)
818                     System.arraycopy(x, 0, a, 0, size);
819                 if (size < a.length)
820                     a[size] = null;
821                 return a;
822             }
823             return (size == x.length) ? x : Arrays.copyOf(x, size);
824         }
825     }
826 
827     /**
828      * Returns an array containing all of the elements in this queue, in
829      * proper sequence.
830      *
831      * <p>The returned array will be "safe" in that no references to it are
832      * maintained by this queue.  (In other words, this method must allocate
833      * a new array).  The caller is thus free to modify the returned array.
834      *
835      * <p>This method acts as bridge between array-based and collection-based
836      * APIs.
837      *
838      * @return an array containing all of the elements in this queue
839      */
toArray()840     public Object[] toArray() {
841         return toArrayInternal(null);
842     }
843 
844     /**
845      * Returns an array containing all of the elements in this queue, in
846      * proper sequence; the runtime type of the returned array is that of
847      * the specified array.  If the queue fits in the specified array, it
848      * is returned therein.  Otherwise, a new array is allocated with the
849      * runtime type of the specified array and the size of this queue.
850      *
851      * <p>If this queue fits in the specified array with room to spare
852      * (i.e., the array has more elements than this queue), the element in
853      * the array immediately following the end of the queue is set to
854      * {@code null}.
855      *
856      * <p>Like the {@link #toArray()} method, this method acts as bridge between
857      * array-based and collection-based APIs.  Further, this method allows
858      * precise control over the runtime type of the output array, and may,
859      * under certain circumstances, be used to save allocation costs.
860      *
861      * <p>Suppose {@code x} is a queue known to contain only strings.
862      * The following code can be used to dump the queue into a newly
863      * allocated array of {@code String}:
864      *
865      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
866      *
867      * Note that {@code toArray(new Object[0])} is identical in function to
868      * {@code toArray()}.
869      *
870      * @param a the array into which the elements of the queue are to
871      *          be stored, if it is big enough; otherwise, a new array of the
872      *          same runtime type is allocated for this purpose
873      * @return an array containing all of the elements in this queue
874      * @throws ArrayStoreException if the runtime type of the specified array
875      *         is not a supertype of the runtime type of every element in
876      *         this queue
877      * @throws NullPointerException if the specified array is null
878      */
879     @SuppressWarnings("unchecked")
toArray(T[] a)880     public <T> T[] toArray(T[] a) {
881         Objects.requireNonNull(a);
882         return (T[]) toArrayInternal(a);
883     }
884 
885     /**
886      * Weakly-consistent iterator.
887      *
888      * Lazily updated ancestor is expected to be amortized O(1) remove(),
889      * but O(n) in the worst case, when lastRet is concurrently deleted.
890      */
891     final class Itr implements Iterator<E> {
892         private DualNode nextNode;   // next node to return item for
893         private E nextItem;          // the corresponding item
894         private DualNode lastRet;    // last returned node, to support remove
895         private DualNode ancestor;   // Helps unlink lastRet on remove()
896 
897         /**
898          * Moves to next node after pred, or first node if pred null.
899          */
900         @SuppressWarnings("unchecked")
advance(DualNode pred)901         private void advance(DualNode pred) {
902             for (DualNode p = (pred == null) ? head : pred.next, c = p;
903                  p != null; ) {
904                 boolean isData = p.isData;
905                 Object item = p.item;
906                 if (isData && item != null) {
907                     nextNode = p;
908                     nextItem = (E) item;
909                     if (c != p)
910                         tryCasSuccessor(pred, c, p);
911                     return;
912                 }
913                 else if (!isData && item == null)
914                     break;
915                 if (c != p && !tryCasSuccessor(pred, c, c = p)) {
916                     pred = p;
917                     c = p = p.next;
918                 }
919                 else if (p == (p = p.next)) {
920                     pred = null;
921                     c = p = head;
922                 }
923             }
924             nextItem = null;
925             nextNode = null;
926         }
927 
Itr()928         Itr() {
929             advance(null);
930         }
931 
hasNext()932         public final boolean hasNext() {
933             return nextNode != null;
934         }
935 
next()936         public final E next() {
937             DualNode p;
938             if ((p = nextNode) == null) throw new NoSuchElementException();
939             E e = nextItem;
940             advance(lastRet = p);
941             return e;
942         }
943 
forEachRemaining(Consumer<? super E> action)944         public void forEachRemaining(Consumer<? super E> action) {
945             Objects.requireNonNull(action);
946             DualNode q = null;
947             for (DualNode p; (p = nextNode) != null; advance(q = p))
948                 action.accept(nextItem);
949             if (q != null)
950                 lastRet = q;
951         }
952 
remove()953         public final void remove() {
954             final DualNode lastRet = this.lastRet;
955             if (lastRet == null)
956                 throw new IllegalStateException();
957             this.lastRet = null;
958             if (lastRet.item == null)   // already deleted?
959                 return;
960             // Advance ancestor, collapsing intervening dead nodes
961             DualNode pred = ancestor;
962             for (DualNode p = (pred == null) ? head : pred.next, c = p, q;
963                  p != null; ) {
964                 if (p == lastRet) {
965                     tryMatchData(p, p.item);
966                     if ((q = p.next) == null) q = p;
967                     if (c != q) tryCasSuccessor(pred, c, q);
968                     ancestor = pred;
969                     return;
970                 }
971                 final Object item; final boolean pAlive;
972                 if (pAlive = ((item = p.item) != null && p.isData)) {
973                     // exceptionally, nothing to do
974                 }
975                 else if (!p.isData && item == null)
976                     break;
977                 if ((c != p && !tryCasSuccessor(pred, c, c = p)) || pAlive) {
978                     pred = p;
979                     c = p = p.next;
980                 }
981                 else if (p == (p = p.next)) {
982                     pred = null;
983                     c = p = head;
984                 }
985             }
986             // traversal failed to find lastRet; must have been deleted;
987             // leave ancestor at original location to avoid overshoot;
988             // better luck next time!
989 
990             // assert lastRet.matched();
991         }
992     }
993 
994     /** A customized variant of Spliterators.IteratorSpliterator */
995     final class LTQSpliterator implements Spliterator<E> {
996         static final int MAX_BATCH = 1 << 25;  // max batch array size;
997         DualNode current;   // current node; null until initialized
998         int batch;          // batch size for splits
999         boolean exhausted;  // true when no more nodes
LTQSpliterator()1000         LTQSpliterator() {}
1001 
trySplit()1002         public Spliterator<E> trySplit() {
1003             DualNode p, q;
1004             if ((p = current()) == null || (q = p.next) == null)
1005                 return null;
1006             int i = 0, n = batch = Math.min(batch + 1, MAX_BATCH);
1007             Object[] a = null;
1008             do {
1009                 final Object item = p.item;
1010                 if (p.isData) {
1011                     if (item != null) {
1012                         if (a == null)
1013                             a = new Object[n];
1014                         a[i++] = item;
1015                     }
1016                 } else if (item == null) {
1017                     p = null;
1018                     break;
1019                 }
1020                 if (p == (p = q))
1021                     p = firstDataNode();
1022             } while (p != null && (q = p.next) != null && i < n);
1023             setCurrent(p);
1024             return (i == 0) ? null :
1025                 Spliterators.spliterator(a, 0, i, (Spliterator.ORDERED |
1026                                                    Spliterator.NONNULL |
1027                                                    Spliterator.CONCURRENT));
1028         }
1029 
forEachRemaining(Consumer<? super E> action)1030         public void forEachRemaining(Consumer<? super E> action) {
1031             Objects.requireNonNull(action);
1032             final DualNode p;
1033             if ((p = current()) != null) {
1034                 current = null;
1035                 exhausted = true;
1036                 forEachFrom(action, p);
1037             }
1038         }
1039 
1040         @SuppressWarnings("unchecked")
tryAdvance(Consumer<? super E> action)1041         public boolean tryAdvance(Consumer<? super E> action) {
1042             Objects.requireNonNull(action);
1043             DualNode p;
1044             if ((p = current()) != null) {
1045                 E e = null;
1046                 do {
1047                     boolean isData = p.isData;
1048                     Object item = p.item;
1049                     if (p == (p = p.next))
1050                         p = head;
1051                     if (isData) {
1052                         if (item != null) {
1053                             e = (E) item;
1054                             break;
1055                         }
1056                     }
1057                     else if (item == null)
1058                         p = null;
1059                 } while (p != null);
1060                 setCurrent(p);
1061                 if (e != null) {
1062                     action.accept(e);
1063                     return true;
1064                 }
1065             }
1066             return false;
1067         }
1068 
setCurrent(DualNode p)1069         private void setCurrent(DualNode p) {
1070             if ((current = p) == null)
1071                 exhausted = true;
1072         }
1073 
current()1074         private DualNode current() {
1075             DualNode p;
1076             if ((p = current) == null && !exhausted)
1077                 setCurrent(p = firstDataNode());
1078             return p;
1079         }
1080 
estimateSize()1081         public long estimateSize() { return Long.MAX_VALUE; }
1082 
characteristics()1083         public int characteristics() {
1084             return (Spliterator.ORDERED |
1085                     Spliterator.NONNULL |
1086                     Spliterator.CONCURRENT);
1087         }
1088     }
1089 
1090     /**
1091      * Returns a {@link Spliterator} over the elements in this queue.
1092      *
1093      * <p>The returned spliterator is
1094      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1095      *
1096      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1097      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1098      *
1099      * @implNote
1100      * The {@code Spliterator} implements {@code trySplit} to permit limited
1101      * parallelism.
1102      *
1103      * @return a {@code Spliterator} over the elements in this queue
1104      * @since 1.8
1105      */
spliterator()1106     public Spliterator<E> spliterator() {
1107         return new LTQSpliterator();
1108     }
1109 
1110     /**
1111      * Creates an initially empty {@code LinkedTransferQueue}.
1112      */
LinkedTransferQueue()1113     public LinkedTransferQueue() {
1114     }
1115 
1116     /**
1117      * Creates a {@code LinkedTransferQueue}
1118      * initially containing the elements of the given collection,
1119      * added in traversal order of the collection's iterator.
1120      *
1121      * @param c the collection of elements to initially contain
1122      * @throws NullPointerException if the specified collection or any
1123      *         of its elements are null
1124      */
LinkedTransferQueue(Collection<? extends E> c)1125     public LinkedTransferQueue(Collection<? extends E> c) {
1126         DualNode h = null, t = null;
1127         for (E e : c) {
1128             DualNode newNode = new DualNode(Objects.requireNonNull(e), true);
1129             if (t == null)
1130                 h = newNode;
1131             else
1132                 t.next = newNode;
1133             t = newNode;
1134         }
1135         head = h;
1136         tail = t;
1137     }
1138 
1139     /**
1140      * Inserts the specified element at the tail of this queue.
1141      * As the queue is unbounded, this method will never block.
1142      *
1143      * @throws NullPointerException if the specified element is null
1144      */
put(E e)1145     public void put(E e) {
1146         Objects.requireNonNull(e);
1147         xfer(e, -1L);
1148     }
1149 
1150     /**
1151      * Inserts the specified element at the tail of this queue.
1152      * As the queue is unbounded, this method will never block or
1153      * return {@code false}.
1154      *
1155      * @return {@code true} (as specified by
1156      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
1157      * @throws NullPointerException if the specified element is null
1158      */
offer(E e, long timeout, TimeUnit unit)1159     public boolean offer(E e, long timeout, TimeUnit unit) {
1160         Objects.requireNonNull(e);
1161         xfer(e, -1L);
1162         return true;
1163     }
1164 
1165     /**
1166      * Inserts the specified element at the tail of this queue.
1167      * As the queue is unbounded, this method will never return {@code false}.
1168      *
1169      * @return {@code true} (as specified by {@link Queue#offer})
1170      * @throws NullPointerException if the specified element is null
1171      */
offer(E e)1172     public boolean offer(E e) {
1173         Objects.requireNonNull(e);
1174         xfer(e, -1L);
1175         return true;
1176     }
1177 
1178     /**
1179      * Inserts the specified element at the tail of this queue.
1180      * As the queue is unbounded, this method will never throw
1181      * {@link IllegalStateException} or return {@code false}.
1182      *
1183      * @return {@code true} (as specified by {@link Collection#add})
1184      * @throws NullPointerException if the specified element is null
1185      */
add(E e)1186     public boolean add(E e) {
1187         Objects.requireNonNull(e);
1188         xfer(e, -1L);
1189         return true;
1190     }
1191 
1192     /**
1193      * Transfers the element to a waiting consumer immediately, if possible.
1194      *
1195      * <p>More precisely, transfers the specified element immediately
1196      * if there exists a consumer already waiting to receive it (in
1197      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1198      * otherwise returning {@code false} without enqueuing the element.
1199      *
1200      * @throws NullPointerException if the specified element is null
1201      */
tryTransfer(E e)1202     public boolean tryTransfer(E e) {
1203         Objects.requireNonNull(e);
1204         return xfer(e, 0L) == null;
1205     }
1206 
1207     /**
1208      * Transfers the element to a consumer, waiting if necessary to do so.
1209      *
1210      * <p>More precisely, transfers the specified element immediately
1211      * if there exists a consumer already waiting to receive it (in
1212      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1213      * else inserts the specified element at the tail of this queue
1214      * and waits until the element is received by a consumer.
1215      *
1216      * @throws NullPointerException if the specified element is null
1217      */
transfer(E e)1218     public void transfer(E e) throws InterruptedException {
1219         Objects.requireNonNull(e);
1220         if (!Thread.interrupted()) {
1221             if (xfer(e, Long.MAX_VALUE) == null)
1222                 return;
1223             Thread.interrupted(); // failure possible only due to interrupt
1224         }
1225         throw new InterruptedException();
1226     }
1227 
1228     /**
1229      * Transfers the element to a consumer if it is possible to do so
1230      * before the timeout elapses.
1231      *
1232      * <p>More precisely, transfers the specified element immediately
1233      * if there exists a consumer already waiting to receive it (in
1234      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1235      * else inserts the specified element at the tail of this queue
1236      * and waits until the element is received by a consumer,
1237      * returning {@code false} if the specified wait time elapses
1238      * before the element can be transferred.
1239      *
1240      * @throws NullPointerException if the specified element is null
1241      */
tryTransfer(E e, long timeout, TimeUnit unit)1242     public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1243         throws InterruptedException {
1244         Objects.requireNonNull(e);
1245         long nanos = Math.max(unit.toNanos(timeout), 0L);
1246         if (xfer(e, nanos) == null)
1247             return true;
1248         if (!Thread.interrupted())
1249             return false;
1250         throw new InterruptedException();
1251     }
1252 
1253     @SuppressWarnings("unchecked")
take()1254     public E take() throws InterruptedException {
1255         Object e;
1256         if (!Thread.interrupted()) {
1257             if ((e = xfer(null, Long.MAX_VALUE)) != null)
1258                 return (E) e;
1259             Thread.interrupted();
1260         }
1261         throw new InterruptedException();
1262     }
1263 
1264     @SuppressWarnings("unchecked")
poll(long timeout, TimeUnit unit)1265     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1266         Object e;
1267         long nanos = Math.max(unit.toNanos(timeout), 0L);
1268         if ((e = xfer(null, nanos)) != null || !Thread.interrupted())
1269             return (E) e;
1270         throw new InterruptedException();
1271     }
1272 
1273     @SuppressWarnings("unchecked")
poll()1274     public E poll() {
1275         return (E) xfer(null, 0L);
1276     }
1277 
1278     /**
1279      * @throws NullPointerException     {@inheritDoc}
1280      * @throws IllegalArgumentException {@inheritDoc}
1281      */
drainTo(Collection<? super E> c)1282     public int drainTo(Collection<? super E> c) {
1283         Objects.requireNonNull(c);
1284         if (c == this)
1285             throw new IllegalArgumentException();
1286         int n = 0;
1287         for (E e; (e = poll()) != null; n++)
1288             c.add(e);
1289         return n;
1290     }
1291 
1292     /**
1293      * @throws NullPointerException     {@inheritDoc}
1294      * @throws IllegalArgumentException {@inheritDoc}
1295      */
drainTo(Collection<? super E> c, int maxElements)1296     public int drainTo(Collection<? super E> c, int maxElements) {
1297         Objects.requireNonNull(c);
1298         if (c == this)
1299             throw new IllegalArgumentException();
1300         int n = 0;
1301         for (E e; n < maxElements && (e = poll()) != null; n++)
1302             c.add(e);
1303         return n;
1304     }
1305 
1306     /**
1307      * Returns an iterator over the elements in this queue in proper sequence.
1308      * The elements will be returned in order from first (head) to last (tail).
1309      *
1310      * <p>The returned iterator is
1311      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1312      *
1313      * @return an iterator over the elements in this queue in proper sequence
1314      */
iterator()1315     public Iterator<E> iterator() {
1316         return new Itr();
1317     }
1318 
peek()1319     public E peek() {
1320         restartFromHead: for (;;) {
1321             for (DualNode p = head; p != null;) {
1322                 Object item = p.item;
1323                 if (p.isData) {
1324                     if (item != null) {
1325                         @SuppressWarnings("unchecked") E e = (E) item;
1326                         return e;
1327                     }
1328                 }
1329                 else if (item == null)
1330                     break;
1331                 if (p == (p = p.next))
1332                     continue restartFromHead;
1333             }
1334             return null;
1335         }
1336     }
1337 
1338     /**
1339      * Returns {@code true} if this queue contains no elements.
1340      *
1341      * @return {@code true} if this queue contains no elements
1342      */
isEmpty()1343     public boolean isEmpty() {
1344         return firstDataNode() == null;
1345     }
1346 
hasWaitingConsumer()1347     public boolean hasWaitingConsumer() {
1348         restartFromHead: for (;;) {
1349             for (DualNode p = head; p != null;) {
1350                 Object item = p.item;
1351                 if (p.isData) {
1352                     if (item != null)
1353                         break;
1354                 }
1355                 else if (item == null)
1356                     return true;
1357                 if (p == (p = p.next))
1358                     continue restartFromHead;
1359             }
1360             return false;
1361         }
1362     }
1363 
1364     /**
1365      * Returns the number of elements in this queue.  If this queue
1366      * contains more than {@code Integer.MAX_VALUE} elements, returns
1367      * {@code Integer.MAX_VALUE}.
1368      *
1369      * <p>Beware that, unlike in most collections, this method is
1370      * <em>NOT</em> a constant-time operation. Because of the
1371      * asynchronous nature of these queues, determining the current
1372      * number of elements requires an O(n) traversal.
1373      *
1374      * @return the number of elements in this queue
1375      */
size()1376     public int size() {
1377         return countOfMode(true);
1378     }
1379 
getWaitingConsumerCount()1380     public int getWaitingConsumerCount() {
1381         return countOfMode(false);
1382     }
1383 
1384     /**
1385      * Removes a single instance of the specified element from this queue,
1386      * if it is present.  More formally, removes an element {@code e} such
1387      * that {@code o.equals(e)}, if this queue contains one or more such
1388      * elements.
1389      * Returns {@code true} if this queue contained the specified element
1390      * (or equivalently, if this queue changed as a result of the call).
1391      *
1392      * @param o element to be removed from this queue, if present
1393      * @return {@code true} if this queue changed as a result of the call
1394      */
remove(Object o)1395     public boolean remove(Object o) {
1396         if (o == null) return false;
1397         restartFromHead: for (;;) {
1398             for (DualNode p = head, pred = null; p != null; ) {
1399                 boolean isData = p.isData;
1400                 Object item = p.item;
1401                 DualNode q = p.next;
1402                 if (item != null) {
1403                     if (isData) {
1404                         if (o.equals(item) && tryMatchData(p, item)) {
1405                             skipDeadNodes(pred, p, p, q);
1406                             return true;
1407                         }
1408                         pred = p; p = q; continue;
1409                     }
1410                 }
1411                 else if (!isData)
1412                     break;
1413                 for (DualNode c = p;; q = p.next) {
1414                     if (q == null || !q.matched()) {
1415                         pred = skipDeadNodes(pred, c, p, q); p = q; break;
1416                     }
1417                     if (p == (p = q)) continue restartFromHead;
1418                 }
1419             }
1420             return false;
1421         }
1422     }
1423 
1424     /**
1425      * Returns {@code true} if this queue contains the specified element.
1426      * More formally, returns {@code true} if and only if this queue contains
1427      * at least one element {@code e} such that {@code o.equals(e)}.
1428      *
1429      * @param o object to be checked for containment in this queue
1430      * @return {@code true} if this queue contains the specified element
1431      */
contains(Object o)1432     public boolean contains(Object o) {
1433         if (o == null) return false;
1434         restartFromHead: for (;;) {
1435             for (DualNode p = head, pred = null; p != null; ) {
1436                 boolean isData = p.isData;
1437                 Object item = p.item;
1438                 DualNode q = p.next;
1439                 if (item != null) {
1440                     if (isData) {
1441                         if (o.equals(item))
1442                             return true;
1443                         pred = p; p = q; continue;
1444                     }
1445                 }
1446                 else if (!isData)
1447                     break;
1448                 for (DualNode c = p;; q = p.next) {
1449                     if (q == null || !q.matched()) {
1450                         pred = skipDeadNodes(pred, c, p, q); p = q; break;
1451                     }
1452                     if (p == (p = q)) continue restartFromHead;
1453                 }
1454             }
1455             return false;
1456         }
1457     }
1458 
1459     /**
1460      * Always returns {@code Integer.MAX_VALUE} because a
1461      * {@code LinkedTransferQueue} is not capacity constrained.
1462      *
1463      * @return {@code Integer.MAX_VALUE} (as specified by
1464      *         {@link BlockingQueue#remainingCapacity()})
1465      */
remainingCapacity()1466     public int remainingCapacity() {
1467         return Integer.MAX_VALUE;
1468     }
1469 
1470     /**
1471      * Saves this queue to a stream (that is, serializes it).
1472      *
1473      * @param s the stream
1474      * @throws java.io.IOException if an I/O error occurs
1475      * @serialData All of the elements (each an {@code E}) in
1476      * the proper order, followed by a null
1477      */
writeObject(java.io.ObjectOutputStream s)1478     private void writeObject(java.io.ObjectOutputStream s)
1479         throws java.io.IOException {
1480         s.defaultWriteObject();
1481         for (E e : this)
1482             s.writeObject(e);
1483         // Use trailing null as sentinel
1484         s.writeObject(null);
1485     }
1486 
1487     /**
1488      * Reconstitutes this queue from a stream (that is, deserializes it).
1489      * @param s the stream
1490      * @throws ClassNotFoundException if the class of a serialized object
1491      *         could not be found
1492      * @throws java.io.IOException if an I/O error occurs
1493      */
readObject(java.io.ObjectInputStream s)1494     private void readObject(java.io.ObjectInputStream s)
1495         throws java.io.IOException, ClassNotFoundException {
1496 
1497         // Read in elements until trailing null sentinel found
1498         DualNode h = null, t = null;
1499         for (Object item; (item = s.readObject()) != null; ) {
1500             DualNode newNode = new DualNode(item, true);
1501             if (t == null)
1502                 h = newNode;
1503             else
1504                 t.next = newNode;
1505             t = newNode;
1506         }
1507         head = h;
1508         tail = t;
1509     }
1510 
1511     /**
1512      * @throws NullPointerException {@inheritDoc}
1513      */
removeIf(Predicate<? super E> filter)1514     public boolean removeIf(Predicate<? super E> filter) {
1515         Objects.requireNonNull(filter);
1516         return bulkRemove(filter);
1517     }
1518 
1519     /**
1520      * @throws NullPointerException {@inheritDoc}
1521      */
removeAll(Collection<?> c)1522     public boolean removeAll(Collection<?> c) {
1523         Objects.requireNonNull(c);
1524         return bulkRemove(e -> c.contains(e));
1525     }
1526 
1527     /**
1528      * @throws NullPointerException {@inheritDoc}
1529      */
retainAll(Collection<?> c)1530     public boolean retainAll(Collection<?> c) {
1531         Objects.requireNonNull(c);
1532         return bulkRemove(e -> !c.contains(e));
1533     }
1534 
clear()1535     public void clear() {
1536         bulkRemove(e -> true);
1537     }
1538 
1539     /**
1540      * Tolerate this many consecutive dead nodes before CAS-collapsing.
1541      * Amortized cost of clear() is (1 + 1/MAX_HOPS) CASes per element.
1542      */
1543     private static final int MAX_HOPS = 8;
1544 
1545 
1546     /** Implementation of bulk remove methods. */
1547     @SuppressWarnings("unchecked")
bulkRemove(Predicate<? super E> filter)1548     private boolean bulkRemove(Predicate<? super E> filter) {
1549         boolean removed = false;
1550         restartFromHead: for (;;) {
1551             int hops = MAX_HOPS;
1552             // c will be CASed to collapse intervening dead nodes between
1553             // pred (or head if null) and p.
1554             for (DualNode p = head, c = p, pred = null, q; p != null; p = q) {
1555                 boolean isData = p.isData, pAlive;
1556                 Object item = p.item;
1557                 q = p.next;
1558                 if (pAlive = (item != null && isData)) {
1559                     if (filter.test((E) item)) {
1560                         if (tryMatchData(p, item))
1561                             removed = true;
1562                         pAlive = false;
1563                     }
1564                 }
1565                 else if (!isData && item == null)
1566                     break;
1567                 if (pAlive || q == null || --hops == 0) {
1568                     // p might already be self-linked here, but if so:
1569                     // - CASing head will surely fail
1570                     // - CASing pred's next will be useless but harmless.
1571                     if ((c != p && !tryCasSuccessor(pred, c, c = p)) || pAlive) {
1572                         // if CAS failed or alive, abandon old pred
1573                         hops = MAX_HOPS;
1574                         pred = p;
1575                         c = q;
1576                     }
1577                 } else if (p == q)
1578                     continue restartFromHead;
1579             }
1580             return removed;
1581         }
1582     }
1583 
1584     /**
1585      * Runs action on each element found during a traversal starting at p.
1586      * If p is null, the action is not run.
1587      */
1588     @SuppressWarnings("unchecked")
forEachFrom(Consumer<? super E> action, DualNode p)1589     void forEachFrom(Consumer<? super E> action, DualNode p) {
1590         for (DualNode pred = null; p != null; ) {
1591             boolean isData = p.isData;
1592             Object item = p.item;
1593             DualNode q = p.next;
1594             if (item != null) {
1595                 if (isData) {
1596                     action.accept((E) item);
1597                     pred = p; p = q; continue;
1598                 }
1599             }
1600             else if (!isData)
1601                 break;
1602             for (DualNode c = p;; q = p.next) {
1603                 if (q == null || !q.matched()) {
1604                     pred = skipDeadNodes(pred, c, p, q); p = q; break;
1605                 }
1606                 if (p == (p = q)) { pred = null; p = head; break; }
1607             }
1608         }
1609     }
1610 
1611     /**
1612      * @throws NullPointerException {@inheritDoc}
1613      */
forEach(Consumer<? super E> action)1614     public void forEach(Consumer<? super E> action) {
1615         Objects.requireNonNull(action);
1616         forEachFrom(action, head);
1617     }
1618 
1619     // VarHandle mechanics
1620     static final VarHandle HEAD;
1621     static final VarHandle TAIL;
1622     static final VarHandle SWEEPVOTES;
1623     static {
1624         try {
1625             Class<?> ltq = LinkedTransferQueue.class, tn = DualNode.class;
1626             MethodHandles.Lookup l = MethodHandles.lookup();
1627             HEAD = l.findVarHandle(ltq, "head", tn);
1628             TAIL = l.findVarHandle(ltq, "tail", tn);
1629             SWEEPVOTES = l.findVarHandle(ltq, "sweepVotes", int.class);
1630         } catch (ReflectiveOperationException e) {
1631             throw new ExceptionInInitializerError(e);
1632         }
1633     }
1634 }
1635