• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea, Bill Scherer, and Michael Scott with
32  * assistance from members of JCP JSR-166 Expert Group and released to
33  * the public domain, as explained at
34  * http://creativecommons.org/publicdomain/zero/1.0/
35  */
36 
37 package java.util.concurrent;
38 
39 import java.lang.invoke.MethodHandles;
40 import java.lang.invoke.VarHandle;
41 import java.util.AbstractQueue;
42 import java.util.Collection;
43 import java.util.Collections;
44 import java.util.Iterator;
45 import java.util.Objects;
46 import java.util.Spliterator;
47 import java.util.Spliterators;
48 import java.util.concurrent.locks.LockSupport;
49 import java.util.concurrent.locks.ReentrantLock;
50 
51 /**
52  * A {@linkplain BlockingQueue blocking queue} in which each insert
53  * operation must wait for a corresponding remove operation by another
54  * thread, and vice versa.  A synchronous queue does not have any
55  * internal capacity, not even a capacity of one.  You cannot
56  * {@code peek} at a synchronous queue because an element is only
57  * present when you try to remove it; you cannot insert an element
58  * (using any method) unless another thread is trying to remove it;
59  * you cannot iterate as there is nothing to iterate.  The
60  * <em>head</em> of the queue is the element that the first queued
61  * inserting thread is trying to add to the queue; if there is no such
62  * queued thread then no element is available for removal and
63  * {@code poll()} will return {@code null}.  For purposes of other
64  * {@code Collection} methods (for example {@code contains}), a
65  * {@code SynchronousQueue} acts as an empty collection.  This queue
66  * does not permit {@code null} elements.
67  *
68  * <p>Synchronous queues are similar to rendezvous channels used in
69  * CSP and Ada. They are well suited for handoff designs, in which an
70  * object running in one thread must sync up with an object running
71  * in another thread in order to hand it some information, event, or
72  * task.
73  *
74  * <p>This class supports an optional fairness policy for ordering
75  * waiting producer and consumer threads.  By default, this ordering
76  * is not guaranteed. However, a queue constructed with fairness set
77  * to {@code true} grants threads access in FIFO order.
78  *
79  * <p>This class and its iterator implement all of the <em>optional</em>
80  * methods of the {@link Collection} and {@link Iterator} interfaces.
81  *
82  * <p>This class is a member of the
83  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
84  * Java Collections Framework</a>.
85  *
86  * @since 1.5
87  * @author Doug Lea and Bill Scherer and Michael Scott
88  * @param <E> the type of elements held in this queue
89  */
90 public class SynchronousQueue<E> extends AbstractQueue<E>
91     implements BlockingQueue<E>, java.io.Serializable {
92     private static final long serialVersionUID = -3223113410248163686L;
93 
94     /*
95      * This class implements extensions of the dual stack and dual
96      * queue algorithms described in "Nonblocking Concurrent Objects
97      * with Condition Synchronization", by W. N. Scherer III and
98      * M. L. Scott.  18th Annual Conf. on Distributed Computing,
99      * Oct. 2004 (see also
100      * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
101      * The (Lifo) stack is used for non-fair mode, and the (Fifo)
102      * queue for fair mode. The performance of the two is generally
103      * similar. Fifo usually supports higher throughput under
104      * contention but Lifo maintains higher thread locality in common
105      * applications.
106      *
107      * A dual queue (and similarly stack) is one that at any given
108      * time either holds "data" -- items provided by put operations,
109      * or "requests" -- slots representing take operations, or is
110      * empty. A call to "fulfill" (i.e., a call requesting an item
111      * from a queue holding data or vice versa) dequeues a
112      * complementary node.  The most interesting feature of these
113      * queues is that any operation can figure out which mode the
114      * queue is in, and act accordingly without needing locks.
115      *
116      * Both the queue and stack extend abstract class Transferer
117      * defining the single method transfer that does a put or a
118      * take. These are unified into a single method because in dual
119      * data structures, the put and take operations are symmetrical,
120      * so nearly all code can be combined. The resulting transfer
121      * methods are on the long side, but are easier to follow than
122      * they would be if broken up into nearly-duplicated parts.
123      *
124      * The queue and stack data structures share many conceptual
125      * similarities but very few concrete details. For simplicity,
126      * they are kept distinct so that they can later evolve
127      * separately.
128      *
129      * The algorithms here differ from the versions in the above paper
130      * in extending them for use in synchronous queues, as well as
131      * dealing with cancellation. The main differences include:
132      *
133      *  1. The original algorithms used bit-marked pointers, but
134      *     the ones here use mode bits in nodes, leading to a number
135      *     of further adaptations.
136      *  2. SynchronousQueues must block threads waiting to become
137      *     fulfilled.
138      *  3. Support for cancellation via timeout and interrupts,
139      *     including cleaning out cancelled nodes/threads
140      *     from lists to avoid garbage retention and memory depletion.
141      *
142      * Blocking is mainly accomplished using LockSupport park/unpark,
143      * except that nodes that appear to be the next ones to become
144      * fulfilled first spin a bit (on multiprocessors only). On very
145      * busy synchronous queues, spinning can dramatically improve
146      * throughput. And on less busy ones, the amount of spinning is
147      * small enough not to be noticeable.
148      *
149      * Cleaning is done in different ways in queues vs stacks.  For
150      * queues, we can almost always remove a node immediately in O(1)
151      * time (modulo retries for consistency checks) when it is
152      * cancelled. But if it may be pinned as the current tail, it must
153      * wait until some subsequent cancellation. For stacks, we need a
154      * potentially O(n) traversal to be sure that we can remove the
155      * node, but this can run concurrently with other threads
156      * accessing the stack.
157      *
158      * While garbage collection takes care of most node reclamation
159      * issues that otherwise complicate nonblocking algorithms, care
160      * is taken to "forget" references to data, other nodes, and
161      * threads that might be held on to long-term by blocked
162      * threads. In cases where setting to null would otherwise
163      * conflict with main algorithms, this is done by changing a
164      * node's link to now point to the node itself. This doesn't arise
165      * much for Stack nodes (because blocked threads do not hang on to
166      * old head pointers), but references in Queue nodes must be
167      * aggressively forgotten to avoid reachability of everything any
168      * node has ever referred to since arrival.
169      */
170 
171     /**
172      * Shared internal API for dual stacks and queues.
173      */
174     abstract static class Transferer<E> {
175         /**
176          * Performs a put or take.
177          *
178          * @param e if non-null, the item to be handed to a consumer;
179          *          if null, requests that transfer return an item
180          *          offered by producer.
181          * @param timed if this operation should timeout
182          * @param nanos the timeout, in nanoseconds
183          * @return if non-null, the item provided or received; if null,
184          *         the operation failed due to timeout or interrupt --
185          *         the caller can distinguish which of these occurred
186          *         by checking Thread.interrupted.
187          */
transfer(E e, boolean timed, long nanos)188         abstract E transfer(E e, boolean timed, long nanos);
189     }
190 
191     /**
192      * The number of times to spin before blocking in timed waits.
193      * The value is empirically derived -- it works well across a
194      * variety of processors and OSes. Empirically, the best value
195      * seems not to vary with number of CPUs (beyond 2) so is just
196      * a constant.
197      */
198     static final int MAX_TIMED_SPINS =
199         (Runtime.getRuntime().availableProcessors() < 2) ? 0 : 32;
200 
201     /**
202      * The number of times to spin before blocking in untimed waits.
203      * This is greater than timed value because untimed waits spin
204      * faster since they don't need to check times on each spin.
205      */
206     static final int MAX_UNTIMED_SPINS = MAX_TIMED_SPINS * 16;
207 
208     /**
209      * The number of nanoseconds for which it is faster to spin
210      * rather than to use timed park. A rough estimate suffices.
211      */
212     static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
213 
214     /** Dual stack */
215     static final class TransferStack<E> extends Transferer<E> {
216         /*
217          * This extends Scherer-Scott dual stack algorithm, differing,
218          * among other ways, by using "covering" nodes rather than
219          * bit-marked pointers: Fulfilling operations push on marker
220          * nodes (with FULFILLING bit set in mode) to reserve a spot
221          * to match a waiting node.
222          */
223 
224         /* Modes for SNodes, ORed together in node fields */
225         /** Node represents an unfulfilled consumer */
226         static final int REQUEST    = 0;
227         /** Node represents an unfulfilled producer */
228         static final int DATA       = 1;
229         /** Node is fulfilling another unfulfilled DATA or REQUEST */
230         static final int FULFILLING = 2;
231 
232         /** Returns true if m has fulfilling bit set. */
isFulfilling(int m)233         static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
234 
235         /** Node class for TransferStacks. */
236         static final class SNode {
237             volatile SNode next;        // next node in stack
238             volatile SNode match;       // the node matched to this
239             volatile Thread waiter;     // to control park/unpark
240             Object item;                // data; or null for REQUESTs
241             int mode;
242             // Note: item and mode fields don't need to be volatile
243             // since they are always written before, and read after,
244             // other volatile/atomic operations.
245 
SNode(Object item)246             SNode(Object item) {
247                 this.item = item;
248             }
249 
casNext(SNode cmp, SNode val)250             boolean casNext(SNode cmp, SNode val) {
251                 return cmp == next &&
252                     SNEXT.compareAndSet(this, cmp, val);
253             }
254 
255             /**
256              * Tries to match node s to this node, if so, waking up thread.
257              * Fulfillers call tryMatch to identify their waiters.
258              * Waiters block until they have been matched.
259              *
260              * @param s the node to match
261              * @return true if successfully matched to s
262              */
tryMatch(SNode s)263             boolean tryMatch(SNode s) {
264                 if (match == null &&
265                     SMATCH.compareAndSet(this, null, s)) {
266                     Thread w = waiter;
267                     if (w != null) {    // waiters need at most one unpark
268                         waiter = null;
269                         LockSupport.unpark(w);
270                     }
271                     return true;
272                 }
273                 return match == s;
274             }
275 
276             /**
277              * Tries to cancel a wait by matching node to itself.
278              */
tryCancel()279             void tryCancel() {
280                 SMATCH.compareAndSet(this, null, this);
281             }
282 
isCancelled()283             boolean isCancelled() {
284                 return match == this;
285             }
286 
287             // VarHandle mechanics
288             private static final VarHandle SMATCH;
289             private static final VarHandle SNEXT;
290             static {
291                 try {
292                     MethodHandles.Lookup l = MethodHandles.lookup();
293                     SMATCH = l.findVarHandle(SNode.class, "match", SNode.class);
294                     SNEXT = l.findVarHandle(SNode.class, "next", SNode.class);
295                 } catch (ReflectiveOperationException e) {
296                     throw new ExceptionInInitializerError(e);
297                 }
298             }
299         }
300 
301         /** The head (top) of the stack */
302         volatile SNode head;
303 
casHead(SNode h, SNode nh)304         boolean casHead(SNode h, SNode nh) {
305             return h == head &&
306                 SHEAD.compareAndSet(this, h, nh);
307         }
308 
309         /**
310          * Creates or resets fields of a node. Called only from transfer
311          * where the node to push on stack is lazily created and
312          * reused when possible to help reduce intervals between reads
313          * and CASes of head and to avoid surges of garbage when CASes
314          * to push nodes fail due to contention.
315          */
snode(SNode s, Object e, SNode next, int mode)316         static SNode snode(SNode s, Object e, SNode next, int mode) {
317             if (s == null) s = new SNode(e);
318             s.mode = mode;
319             s.next = next;
320             return s;
321         }
322 
323         /**
324          * Puts or takes an item.
325          */
326         @SuppressWarnings("unchecked")
transfer(E e, boolean timed, long nanos)327         E transfer(E e, boolean timed, long nanos) {
328             /*
329              * Basic algorithm is to loop trying one of three actions:
330              *
331              * 1. If apparently empty or already containing nodes of same
332              *    mode, try to push node on stack and wait for a match,
333              *    returning it, or null if cancelled.
334              *
335              * 2. If apparently containing node of complementary mode,
336              *    try to push a fulfilling node on to stack, match
337              *    with corresponding waiting node, pop both from
338              *    stack, and return matched item. The matching or
339              *    unlinking might not actually be necessary because of
340              *    other threads performing action 3:
341              *
342              * 3. If top of stack already holds another fulfilling node,
343              *    help it out by doing its match and/or pop
344              *    operations, and then continue. The code for helping
345              *    is essentially the same as for fulfilling, except
346              *    that it doesn't return the item.
347              */
348 
349             SNode s = null; // constructed/reused as needed
350             int mode = (e == null) ? REQUEST : DATA;
351 
352             for (;;) {
353                 SNode h = head;
354                 if (h == null || h.mode == mode) {  // empty or same-mode
355                     if (timed && nanos <= 0L) {     // can't wait
356                         if (h != null && h.isCancelled())
357                             casHead(h, h.next);     // pop cancelled node
358                         else
359                             return null;
360                     } else if (casHead(h, s = snode(s, e, h, mode))) {
361                         SNode m = awaitFulfill(s, timed, nanos);
362                         if (m == s) {               // wait was cancelled
363                             clean(s);
364                             return null;
365                         }
366                         if ((h = head) != null && h.next == s)
367                             casHead(h, s.next);     // help s's fulfiller
368                         return (E) ((mode == REQUEST) ? m.item : s.item);
369                     }
370                 } else if (!isFulfilling(h.mode)) { // try to fulfill
371                     if (h.isCancelled())            // already cancelled
372                         casHead(h, h.next);         // pop and retry
373                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
374                         for (;;) { // loop until matched or waiters disappear
375                             SNode m = s.next;       // m is s's match
376                             if (m == null) {        // all waiters are gone
377                                 casHead(s, null);   // pop fulfill node
378                                 s = null;           // use new node next time
379                                 break;              // restart main loop
380                             }
381                             SNode mn = m.next;
382                             if (m.tryMatch(s)) {
383                                 casHead(s, mn);     // pop both s and m
384                                 return (E) ((mode == REQUEST) ? m.item : s.item);
385                             } else                  // lost match
386                                 s.casNext(m, mn);   // help unlink
387                         }
388                     }
389                 } else {                            // help a fulfiller
390                     SNode m = h.next;               // m is h's match
391                     if (m == null)                  // waiter is gone
392                         casHead(h, null);           // pop fulfilling node
393                     else {
394                         SNode mn = m.next;
395                         if (m.tryMatch(h))          // help match
396                             casHead(h, mn);         // pop both h and m
397                         else                        // lost match
398                             h.casNext(m, mn);       // help unlink
399                     }
400                 }
401             }
402         }
403 
404         /**
405          * Spins/blocks until node s is matched by a fulfill operation.
406          *
407          * @param s the waiting node
408          * @param timed true if timed wait
409          * @param nanos timeout value
410          * @return matched node, or s if cancelled
411          */
awaitFulfill(SNode s, boolean timed, long nanos)412         SNode awaitFulfill(SNode s, boolean timed, long nanos) {
413             /*
414              * When a node/thread is about to block, it sets its waiter
415              * field and then rechecks state at least one more time
416              * before actually parking, thus covering race vs
417              * fulfiller noticing that waiter is non-null so should be
418              * woken.
419              *
420              * When invoked by nodes that appear at the point of call
421              * to be at the head of the stack, calls to park are
422              * preceded by spins to avoid blocking when producers and
423              * consumers are arriving very close in time.  This can
424              * happen enough to bother only on multiprocessors.
425              *
426              * The order of checks for returning out of main loop
427              * reflects fact that interrupts have precedence over
428              * normal returns, which have precedence over
429              * timeouts. (So, on timeout, one last check for match is
430              * done before giving up.) Except that calls from untimed
431              * SynchronousQueue.{poll/offer} don't check interrupts
432              * and don't wait at all, so are trapped in transfer
433              * method rather than calling awaitFulfill.
434              */
435             final long deadline = timed ? System.nanoTime() + nanos : 0L;
436             Thread w = Thread.currentThread();
437             int spins = shouldSpin(s)
438                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
439                 : 0;
440             for (;;) {
441                 if (w.isInterrupted())
442                     s.tryCancel();
443                 SNode m = s.match;
444                 if (m != null)
445                     return m;
446                 if (timed) {
447                     nanos = deadline - System.nanoTime();
448                     if (nanos <= 0L) {
449                         s.tryCancel();
450                         continue;
451                     }
452                 }
453                 if (spins > 0) {
454                     // Android-removed: remove usage of Thread.onSpinWait. http://b/202837191
455                     // Thread.onSpinWait();
456                     spins = shouldSpin(s) ? (spins - 1) : 0;
457                 }
458                 else if (s.waiter == null)
459                     s.waiter = w; // establish waiter so can park next iter
460                 else if (!timed)
461                     LockSupport.park(this);
462                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
463                     LockSupport.parkNanos(this, nanos);
464             }
465         }
466 
467         /**
468          * Returns true if node s is at head or there is an active
469          * fulfiller.
470          */
shouldSpin(SNode s)471         boolean shouldSpin(SNode s) {
472             SNode h = head;
473             return (h == s || h == null || isFulfilling(h.mode));
474         }
475 
476         /**
477          * Unlinks s from the stack.
478          */
clean(SNode s)479         void clean(SNode s) {
480             s.item = null;   // forget item
481             s.waiter = null; // forget thread
482 
483             /*
484              * At worst we may need to traverse entire stack to unlink
485              * s. If there are multiple concurrent calls to clean, we
486              * might not see s if another thread has already removed
487              * it. But we can stop when we see any node known to
488              * follow s. We use s.next unless it too is cancelled, in
489              * which case we try the node one past. We don't check any
490              * further because we don't want to doubly traverse just to
491              * find sentinel.
492              */
493 
494             SNode past = s.next;
495             if (past != null && past.isCancelled())
496                 past = past.next;
497 
498             // Absorb cancelled nodes at head
499             SNode p;
500             while ((p = head) != null && p != past && p.isCancelled())
501                 casHead(p, p.next);
502 
503             // Unsplice embedded nodes
504             while (p != null && p != past) {
505                 SNode n = p.next;
506                 if (n != null && n.isCancelled())
507                     p.casNext(n, n.next);
508                 else
509                     p = n;
510             }
511         }
512 
513         // VarHandle mechanics
514         private static final VarHandle SHEAD;
515         static {
516             try {
517                 MethodHandles.Lookup l = MethodHandles.lookup();
518                 SHEAD = l.findVarHandle(TransferStack.class, "head", SNode.class);
519             } catch (ReflectiveOperationException e) {
520                 throw new ExceptionInInitializerError(e);
521             }
522         }
523     }
524 
525     /** Dual Queue */
526     static final class TransferQueue<E> extends Transferer<E> {
527         /*
528          * This extends Scherer-Scott dual queue algorithm, differing,
529          * among other ways, by using modes within nodes rather than
530          * marked pointers. The algorithm is a little simpler than
531          * that for stacks because fulfillers do not need explicit
532          * nodes, and matching is done by CAS'ing QNode.item field
533          * from non-null to null (for put) or vice versa (for take).
534          */
535 
536         /** Node class for TransferQueue. */
537         static final class QNode {
538             volatile QNode next;          // next node in queue
539             volatile Object item;         // CAS'ed to or from null
540             volatile Thread waiter;       // to control park/unpark
541             final boolean isData;
542 
QNode(Object item, boolean isData)543             QNode(Object item, boolean isData) {
544                 this.item = item;
545                 this.isData = isData;
546             }
547 
casNext(QNode cmp, QNode val)548             boolean casNext(QNode cmp, QNode val) {
549                 return next == cmp &&
550                     QNEXT.compareAndSet(this, cmp, val);
551             }
552 
casItem(Object cmp, Object val)553             boolean casItem(Object cmp, Object val) {
554                 return item == cmp &&
555                     QITEM.compareAndSet(this, cmp, val);
556             }
557 
558             /**
559              * Tries to cancel by CAS'ing ref to this as item.
560              */
tryCancel(Object cmp)561             void tryCancel(Object cmp) {
562                 QITEM.compareAndSet(this, cmp, this);
563             }
564 
isCancelled()565             boolean isCancelled() {
566                 return item == this;
567             }
568 
569             /**
570              * Returns true if this node is known to be off the queue
571              * because its next pointer has been forgotten due to
572              * an advanceHead operation.
573              */
isOffList()574             boolean isOffList() {
575                 return next == this;
576             }
577 
578             // VarHandle mechanics
579             private static final VarHandle QITEM;
580             private static final VarHandle QNEXT;
581             static {
582                 try {
583                     MethodHandles.Lookup l = MethodHandles.lookup();
584                     QITEM = l.findVarHandle(QNode.class, "item", Object.class);
585                     QNEXT = l.findVarHandle(QNode.class, "next", QNode.class);
586                 } catch (ReflectiveOperationException e) {
587                     throw new ExceptionInInitializerError(e);
588                 }
589             }
590         }
591 
592         /** Head of queue */
593         transient volatile QNode head;
594         /** Tail of queue */
595         transient volatile QNode tail;
596         /**
597          * Reference to a cancelled node that might not yet have been
598          * unlinked from queue because it was the last inserted node
599          * when it was cancelled.
600          */
601         transient volatile QNode cleanMe;
602 
TransferQueue()603         TransferQueue() {
604             QNode h = new QNode(null, false); // initialize to dummy node.
605             head = h;
606             tail = h;
607         }
608 
609         /**
610          * Tries to cas nh as new head; if successful, unlink
611          * old head's next node to avoid garbage retention.
612          */
advanceHead(QNode h, QNode nh)613         void advanceHead(QNode h, QNode nh) {
614             if (h == head &&
615                 QHEAD.compareAndSet(this, h, nh))
616                 h.next = h; // forget old next
617         }
618 
619         /**
620          * Tries to cas nt as new tail.
621          */
advanceTail(QNode t, QNode nt)622         void advanceTail(QNode t, QNode nt) {
623             if (tail == t)
624                 QTAIL.compareAndSet(this, t, nt);
625         }
626 
627         /**
628          * Tries to CAS cleanMe slot.
629          */
casCleanMe(QNode cmp, QNode val)630         boolean casCleanMe(QNode cmp, QNode val) {
631             return cleanMe == cmp &&
632                 QCLEANME.compareAndSet(this, cmp, val);
633         }
634 
635         /**
636          * Puts or takes an item.
637          */
638         @SuppressWarnings("unchecked")
transfer(E e, boolean timed, long nanos)639         E transfer(E e, boolean timed, long nanos) {
640             /* Basic algorithm is to loop trying to take either of
641              * two actions:
642              *
643              * 1. If queue apparently empty or holding same-mode nodes,
644              *    try to add node to queue of waiters, wait to be
645              *    fulfilled (or cancelled) and return matching item.
646              *
647              * 2. If queue apparently contains waiting items, and this
648              *    call is of complementary mode, try to fulfill by CAS'ing
649              *    item field of waiting node and dequeuing it, and then
650              *    returning matching item.
651              *
652              * In each case, along the way, check for and try to help
653              * advance head and tail on behalf of other stalled/slow
654              * threads.
655              *
656              * The loop starts off with a null check guarding against
657              * seeing uninitialized head or tail values. This never
658              * happens in current SynchronousQueue, but could if
659              * callers held non-volatile/final ref to the
660              * transferer. The check is here anyway because it places
661              * null checks at top of loop, which is usually faster
662              * than having them implicitly interspersed.
663              */
664 
665             QNode s = null; // constructed/reused as needed
666             boolean isData = (e != null);
667 
668             for (;;) {
669                 QNode t = tail;
670                 QNode h = head;
671                 if (t == null || h == null)         // saw uninitialized value
672                     continue;                       // spin
673 
674                 if (h == t || t.isData == isData) { // empty or same-mode
675                     QNode tn = t.next;
676                     if (t != tail)                  // inconsistent read
677                         continue;
678                     if (tn != null) {               // lagging tail
679                         advanceTail(t, tn);
680                         continue;
681                     }
682                     if (timed && nanos <= 0L)       // can't wait
683                         return null;
684                     if (s == null)
685                         s = new QNode(e, isData);
686                     if (!t.casNext(null, s))        // failed to link in
687                         continue;
688 
689                     advanceTail(t, s);              // swing tail and wait
690                     Object x = awaitFulfill(s, e, timed, nanos);
691                     if (x == s) {                   // wait was cancelled
692                         clean(t, s);
693                         return null;
694                     }
695 
696                     if (!s.isOffList()) {           // not already unlinked
697                         advanceHead(t, s);          // unlink if head
698                         if (x != null)              // and forget fields
699                             s.item = s;
700                         s.waiter = null;
701                     }
702                     return (x != null) ? (E)x : e;
703 
704                 } else {                            // complementary-mode
705                     QNode m = h.next;               // node to fulfill
706                     if (t != tail || m == null || h != head)
707                         continue;                   // inconsistent read
708 
709                     Object x = m.item;
710                     if (isData == (x != null) ||    // m already fulfilled
711                         x == m ||                   // m cancelled
712                         !m.casItem(x, e)) {         // lost CAS
713                         advanceHead(h, m);          // dequeue and retry
714                         continue;
715                     }
716 
717                     advanceHead(h, m);              // successfully fulfilled
718                     LockSupport.unpark(m.waiter);
719                     return (x != null) ? (E)x : e;
720                 }
721             }
722         }
723 
724         /**
725          * Spins/blocks until node s is fulfilled.
726          *
727          * @param s the waiting node
728          * @param e the comparison value for checking match
729          * @param timed true if timed wait
730          * @param nanos timeout value
731          * @return matched item, or s if cancelled
732          */
awaitFulfill(QNode s, E e, boolean timed, long nanos)733         Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
734             /* Same idea as TransferStack.awaitFulfill */
735             final long deadline = timed ? System.nanoTime() + nanos : 0L;
736             Thread w = Thread.currentThread();
737             int spins = (head.next == s)
738                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
739                 : 0;
740             for (;;) {
741                 if (w.isInterrupted())
742                     s.tryCancel(e);
743                 Object x = s.item;
744                 if (x != e)
745                     return x;
746                 if (timed) {
747                     nanos = deadline - System.nanoTime();
748                     if (nanos <= 0L) {
749                         s.tryCancel(e);
750                         continue;
751                     }
752                 }
753                 if (spins > 0) {
754                     --spins;
755                     // Android-removed: remove usage of Thread.onSpinWait. http://b/202837191
756                     // Thread.onSpinWait();
757                 }
758                 else if (s.waiter == null)
759                     s.waiter = w;
760                 else if (!timed)
761                     LockSupport.park(this);
762                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
763                     LockSupport.parkNanos(this, nanos);
764             }
765         }
766 
767         /**
768          * Gets rid of cancelled node s with original predecessor pred.
769          */
clean(QNode pred, QNode s)770         void clean(QNode pred, QNode s) {
771             s.waiter = null; // forget thread
772             /*
773              * At any given time, exactly one node on list cannot be
774              * deleted -- the last inserted node. To accommodate this,
775              * if we cannot delete s, we save its predecessor as
776              * "cleanMe", deleting the previously saved version
777              * first. At least one of node s or the node previously
778              * saved can always be deleted, so this always terminates.
779              */
780             while (pred.next == s) { // Return early if already unlinked
781                 QNode h = head;
782                 QNode hn = h.next;   // Absorb cancelled first node as head
783                 if (hn != null && hn.isCancelled()) {
784                     advanceHead(h, hn);
785                     continue;
786                 }
787                 QNode t = tail;      // Ensure consistent read for tail
788                 if (t == h)
789                     return;
790                 QNode tn = t.next;
791                 if (t != tail)
792                     continue;
793                 if (tn != null) {
794                     advanceTail(t, tn);
795                     continue;
796                 }
797                 if (s != t) {        // If not tail, try to unsplice
798                     QNode sn = s.next;
799                     if (sn == s || pred.casNext(s, sn))
800                         return;
801                 }
802                 QNode dp = cleanMe;
803                 if (dp != null) {    // Try unlinking previous cancelled node
804                     QNode d = dp.next;
805                     QNode dn;
806                     if (d == null ||               // d is gone or
807                         d == dp ||                 // d is off list or
808                         !d.isCancelled() ||        // d not cancelled or
809                         (d != t &&                 // d not tail and
810                          (dn = d.next) != null &&  //   has successor
811                          dn != d &&                //   that is on list
812                          dp.casNext(d, dn)))       // d unspliced
813                         casCleanMe(dp, null);
814                     if (dp == pred)
815                         return;      // s is already saved node
816                 } else if (casCleanMe(null, pred))
817                     return;          // Postpone cleaning s
818             }
819         }
820 
821         // VarHandle mechanics
822         private static final VarHandle QHEAD;
823         private static final VarHandle QTAIL;
824         private static final VarHandle QCLEANME;
825         static {
826             try {
827                 MethodHandles.Lookup l = MethodHandles.lookup();
828                 QHEAD = l.findVarHandle(TransferQueue.class, "head",
829                                         QNode.class);
830                 QTAIL = l.findVarHandle(TransferQueue.class, "tail",
831                                         QNode.class);
832                 QCLEANME = l.findVarHandle(TransferQueue.class, "cleanMe",
833                                            QNode.class);
834             } catch (ReflectiveOperationException e) {
835                 throw new ExceptionInInitializerError(e);
836             }
837         }
838     }
839 
840     /**
841      * The transferer. Set only in constructor, but cannot be declared
842      * as final without further complicating serialization.  Since
843      * this is accessed only at most once per public method, there
844      * isn't a noticeable performance penalty for using volatile
845      * instead of final here.
846      */
847     private transient volatile Transferer<E> transferer;
848 
849     /**
850      * Creates a {@code SynchronousQueue} with nonfair access policy.
851      */
SynchronousQueue()852     public SynchronousQueue() {
853         this(false);
854     }
855 
856     /**
857      * Creates a {@code SynchronousQueue} with the specified fairness policy.
858      *
859      * @param fair if true, waiting threads contend in FIFO order for
860      *        access; otherwise the order is unspecified.
861      */
SynchronousQueue(boolean fair)862     public SynchronousQueue(boolean fair) {
863         transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
864     }
865 
866     /**
867      * Adds the specified element to this queue, waiting if necessary for
868      * another thread to receive it.
869      *
870      * @throws InterruptedException {@inheritDoc}
871      * @throws NullPointerException {@inheritDoc}
872      */
put(E e)873     public void put(E e) throws InterruptedException {
874         if (e == null) throw new NullPointerException();
875         if (transferer.transfer(e, false, 0) == null) {
876             Thread.interrupted();
877             throw new InterruptedException();
878         }
879     }
880 
881     /**
882      * Inserts the specified element into this queue, waiting if necessary
883      * up to the specified wait time for another thread to receive it.
884      *
885      * @return {@code true} if successful, or {@code false} if the
886      *         specified waiting time elapses before a consumer appears
887      * @throws InterruptedException {@inheritDoc}
888      * @throws NullPointerException {@inheritDoc}
889      */
offer(E e, long timeout, TimeUnit unit)890     public boolean offer(E e, long timeout, TimeUnit unit)
891         throws InterruptedException {
892         if (e == null) throw new NullPointerException();
893         if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
894             return true;
895         if (!Thread.interrupted())
896             return false;
897         throw new InterruptedException();
898     }
899 
900     /**
901      * Inserts the specified element into this queue, if another thread is
902      * waiting to receive it.
903      *
904      * @param e the element to add
905      * @return {@code true} if the element was added to this queue, else
906      *         {@code false}
907      * @throws NullPointerException if the specified element is null
908      */
offer(E e)909     public boolean offer(E e) {
910         if (e == null) throw new NullPointerException();
911         return transferer.transfer(e, true, 0) != null;
912     }
913 
914     /**
915      * Retrieves and removes the head of this queue, waiting if necessary
916      * for another thread to insert it.
917      *
918      * @return the head of this queue
919      * @throws InterruptedException {@inheritDoc}
920      */
take()921     public E take() throws InterruptedException {
922         E e = transferer.transfer(null, false, 0);
923         if (e != null)
924             return e;
925         Thread.interrupted();
926         throw new InterruptedException();
927     }
928 
929     /**
930      * Retrieves and removes the head of this queue, waiting
931      * if necessary up to the specified wait time, for another thread
932      * to insert it.
933      *
934      * @return the head of this queue, or {@code null} if the
935      *         specified waiting time elapses before an element is present
936      * @throws InterruptedException {@inheritDoc}
937      */
poll(long timeout, TimeUnit unit)938     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
939         E e = transferer.transfer(null, true, unit.toNanos(timeout));
940         if (e != null || !Thread.interrupted())
941             return e;
942         throw new InterruptedException();
943     }
944 
945     /**
946      * Retrieves and removes the head of this queue, if another thread
947      * is currently making an element available.
948      *
949      * @return the head of this queue, or {@code null} if no
950      *         element is available
951      */
poll()952     public E poll() {
953         return transferer.transfer(null, true, 0);
954     }
955 
956     /**
957      * Always returns {@code true}.
958      * A {@code SynchronousQueue} has no internal capacity.
959      *
960      * @return {@code true}
961      */
isEmpty()962     public boolean isEmpty() {
963         return true;
964     }
965 
966     /**
967      * Always returns zero.
968      * A {@code SynchronousQueue} has no internal capacity.
969      *
970      * @return zero
971      */
size()972     public int size() {
973         return 0;
974     }
975 
976     /**
977      * Always returns zero.
978      * A {@code SynchronousQueue} has no internal capacity.
979      *
980      * @return zero
981      */
remainingCapacity()982     public int remainingCapacity() {
983         return 0;
984     }
985 
986     /**
987      * Does nothing.
988      * A {@code SynchronousQueue} has no internal capacity.
989      */
clear()990     public void clear() {
991     }
992 
993     /**
994      * Always returns {@code false}.
995      * A {@code SynchronousQueue} has no internal capacity.
996      *
997      * @param o the element
998      * @return {@code false}
999      */
contains(Object o)1000     public boolean contains(Object o) {
1001         return false;
1002     }
1003 
1004     /**
1005      * Always returns {@code false}.
1006      * A {@code SynchronousQueue} has no internal capacity.
1007      *
1008      * @param o the element to remove
1009      * @return {@code false}
1010      */
remove(Object o)1011     public boolean remove(Object o) {
1012         return false;
1013     }
1014 
1015     /**
1016      * Returns {@code false} unless the given collection is empty.
1017      * A {@code SynchronousQueue} has no internal capacity.
1018      *
1019      * @param c the collection
1020      * @return {@code false} unless given collection is empty
1021      */
containsAll(Collection<?> c)1022     public boolean containsAll(Collection<?> c) {
1023         return c.isEmpty();
1024     }
1025 
1026     /**
1027      * Always returns {@code false}.
1028      * A {@code SynchronousQueue} has no internal capacity.
1029      *
1030      * @param c the collection
1031      * @return {@code false}
1032      */
removeAll(Collection<?> c)1033     public boolean removeAll(Collection<?> c) {
1034         return false;
1035     }
1036 
1037     /**
1038      * Always returns {@code false}.
1039      * A {@code SynchronousQueue} has no internal capacity.
1040      *
1041      * @param c the collection
1042      * @return {@code false}
1043      */
retainAll(Collection<?> c)1044     public boolean retainAll(Collection<?> c) {
1045         return false;
1046     }
1047 
1048     /**
1049      * Always returns {@code null}.
1050      * A {@code SynchronousQueue} does not return elements
1051      * unless actively waited on.
1052      *
1053      * @return {@code null}
1054      */
peek()1055     public E peek() {
1056         return null;
1057     }
1058 
1059     /**
1060      * Returns an empty iterator in which {@code hasNext} always returns
1061      * {@code false}.
1062      *
1063      * @return an empty iterator
1064      */
iterator()1065     public Iterator<E> iterator() {
1066         return Collections.emptyIterator();
1067     }
1068 
1069     /**
1070      * Returns an empty spliterator in which calls to
1071      * {@link Spliterator#trySplit() trySplit} always return {@code null}.
1072      *
1073      * @return an empty spliterator
1074      * @since 1.8
1075      */
spliterator()1076     public Spliterator<E> spliterator() {
1077         return Spliterators.emptySpliterator();
1078     }
1079 
1080     /**
1081      * Returns a zero-length array.
1082      * @return a zero-length array
1083      */
toArray()1084     public Object[] toArray() {
1085         return new Object[0];
1086     }
1087 
1088     /**
1089      * Sets the zeroth element of the specified array to {@code null}
1090      * (if the array has non-zero length) and returns it.
1091      *
1092      * @param a the array
1093      * @return the specified array
1094      * @throws NullPointerException if the specified array is null
1095      */
toArray(T[] a)1096     public <T> T[] toArray(T[] a) {
1097         if (a.length > 0)
1098             a[0] = null;
1099         return a;
1100     }
1101 
1102     /**
1103      * Always returns {@code "[]"}.
1104      * @return {@code "[]"}
1105      */
toString()1106     public String toString() {
1107         return "[]";
1108     }
1109 
1110     /**
1111      * @throws UnsupportedOperationException {@inheritDoc}
1112      * @throws ClassCastException            {@inheritDoc}
1113      * @throws NullPointerException          {@inheritDoc}
1114      * @throws IllegalArgumentException      {@inheritDoc}
1115      */
drainTo(Collection<? super E> c)1116     public int drainTo(Collection<? super E> c) {
1117         Objects.requireNonNull(c);
1118         if (c == this)
1119             throw new IllegalArgumentException();
1120         int n = 0;
1121         for (E e; (e = poll()) != null; n++)
1122             c.add(e);
1123         return n;
1124     }
1125 
1126     /**
1127      * @throws UnsupportedOperationException {@inheritDoc}
1128      * @throws ClassCastException            {@inheritDoc}
1129      * @throws NullPointerException          {@inheritDoc}
1130      * @throws IllegalArgumentException      {@inheritDoc}
1131      */
drainTo(Collection<? super E> c, int maxElements)1132     public int drainTo(Collection<? super E> c, int maxElements) {
1133         Objects.requireNonNull(c);
1134         if (c == this)
1135             throw new IllegalArgumentException();
1136         int n = 0;
1137         for (E e; n < maxElements && (e = poll()) != null; n++)
1138             c.add(e);
1139         return n;
1140     }
1141 
1142     /*
1143      * To cope with serialization strategy in the 1.5 version of
1144      * SynchronousQueue, we declare some unused classes and fields
1145      * that exist solely to enable serializability across versions.
1146      * These fields are never used, so are initialized only if this
1147      * object is ever serialized or deserialized.
1148      */
1149 
1150     @SuppressWarnings("serial")
1151     static class WaitQueue implements java.io.Serializable { }
1152     static class LifoWaitQueue extends WaitQueue {
1153         private static final long serialVersionUID = -3633113410248163686L;
1154     }
1155     static class FifoWaitQueue extends WaitQueue {
1156         private static final long serialVersionUID = -3623113410248163686L;
1157     }
1158     private ReentrantLock qlock;
1159     private WaitQueue waitingProducers;
1160     private WaitQueue waitingConsumers;
1161 
1162     /**
1163      * Saves this queue to a stream (that is, serializes it).
1164      * @param s the stream
1165      * @throws java.io.IOException if an I/O error occurs
1166      */
writeObject(java.io.ObjectOutputStream s)1167     private void writeObject(java.io.ObjectOutputStream s)
1168         throws java.io.IOException {
1169         boolean fair = transferer instanceof TransferQueue;
1170         if (fair) {
1171             qlock = new ReentrantLock(true);
1172             waitingProducers = new FifoWaitQueue();
1173             waitingConsumers = new FifoWaitQueue();
1174         }
1175         else {
1176             qlock = new ReentrantLock();
1177             waitingProducers = new LifoWaitQueue();
1178             waitingConsumers = new LifoWaitQueue();
1179         }
1180         s.defaultWriteObject();
1181     }
1182 
1183     /**
1184      * Reconstitutes this queue from a stream (that is, deserializes it).
1185      * @param s the stream
1186      * @throws ClassNotFoundException if the class of a serialized object
1187      *         could not be found
1188      * @throws java.io.IOException if an I/O error occurs
1189      */
readObject(java.io.ObjectInputStream s)1190     private void readObject(java.io.ObjectInputStream s)
1191         throws java.io.IOException, ClassNotFoundException {
1192         s.defaultReadObject();
1193         if (waitingProducers instanceof FifoWaitQueue)
1194             transferer = new TransferQueue<E>();
1195         else
1196             transferer = new TransferStack<E>();
1197     }
1198 
1199     static {
1200         // Reduce the risk of rare disastrous classloading in first call to
1201         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1202         Class<?> ensureLoaded = LockSupport.class;
1203     }
1204 }
1205