• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea, Bill Scherer, and Michael Scott with
32  * assistance from members of JCP JSR-166 Expert Group and released to
33  * the public domain, as explained at
34  * http://creativecommons.org/publicdomain/zero/1.0/
35  */
36 
37 package java.util.concurrent;
38 
39 import java.lang.invoke.MethodHandles;
40 import java.lang.invoke.VarHandle;
41 import java.util.AbstractQueue;
42 import java.util.Collection;
43 import java.util.Collections;
44 import java.util.Iterator;
45 import java.util.Objects;
46 import java.util.Spliterator;
47 import java.util.Spliterators;
48 import java.util.concurrent.locks.LockSupport;
49 import java.util.concurrent.locks.ReentrantLock;
50 import java.util.concurrent.ForkJoinWorkerThread;
51 import java.util.concurrent.LinkedTransferQueue;
52 import java.util.concurrent.TransferQueue;
53 
54 /**
55  * A {@linkplain BlockingQueue blocking queue} in which each insert
56  * operation must wait for a corresponding remove operation by another
57  * thread, and vice versa.  A synchronous queue does not have any
58  * internal capacity, not even a capacity of one.  You cannot
59  * {@code peek} at a synchronous queue because an element is only
60  * present when you try to remove it; you cannot insert an element
61  * (using any method) unless another thread is trying to remove it;
62  * you cannot iterate as there is nothing to iterate.  The
63  * <em>head</em> of the queue is the element that the first queued
64  * inserting thread is trying to add to the queue; if there is no such
65  * queued thread then no element is available for removal and
66  * {@code poll()} will return {@code null}.  For purposes of other
67  * {@code Collection} methods (for example {@code contains}), a
68  * {@code SynchronousQueue} acts as an empty collection.  This queue
69  * does not permit {@code null} elements.
70  *
71  * <p>Synchronous queues are similar to rendezvous channels used in
72  * CSP and Ada. They are well suited for handoff designs, in which an
73  * object running in one thread must sync up with an object running
74  * in another thread in order to hand it some information, event, or
75  * task.
76  *
77  * <p>This class supports an optional fairness policy for ordering
78  * waiting producer and consumer threads.  By default, this ordering
79  * is not guaranteed. However, a queue constructed with fairness set
80  * to {@code true} grants threads access in FIFO order.
81  *
82  * <p>This class and its iterator implement all of the <em>optional</em>
83  * methods of the {@link Collection} and {@link Iterator} interfaces.
84  *
85  * <p>This class is a member of the
86  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
87  * Java Collections Framework</a>.
88  *
89  * @since 1.5
90  * @author Doug Lea and Bill Scherer and Michael Scott
91  * @param <E> the type of elements held in this queue
92  */
93 public class SynchronousQueue<E> extends AbstractQueue<E>
94     implements BlockingQueue<E>, java.io.Serializable {
95     private static final long serialVersionUID = -3223113410248163686L;
96 
97     /*
98      * This class implements extensions of the dual stack and dual
99      * queue algorithms described in "Nonblocking Concurrent Objects
100      * with Condition Synchronization", by W. N. Scherer III and
101      * M. L. Scott.  18th Annual Conf. on Distributed Computing,
102      * Oct. 2004 (see also
103      * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
104      * The queue is treated as a Lifo stack in non-fair mode, and a
105      * Fifo queue in fair mode. In most contexts, transfer performance
106      * is roughly comparable across them. Lifo is usually faster under
107      * low contention, but slower under high contention.  Performance
108      * of applications using them also varies. Lifo is generally
109      * preferable in resource management settings (for example cached
110      * thread pools) because of better temporal locality, but
111      * inappropriate for message-passing applications.
112      *
113      * A dual queue is one that at any given time either holds "data"
114      * -- items provided by put operations, or "requests" -- slots
115      * representing take operations, or is empty. A fulfilling
116      * operation (i.e., a call requesting an item from a queue holding
117      * data or vice versa) "matches" the item of and then dequeues a
118      * complementary node.  Any operation can figure out which mode
119      * the queue is in, and act accordingly without needing locks.  So
120      * put and take operations are symmetrical, and all transfer
121      * methods invoke a single "xfer" method that does a put or a take
122      * in either fifo or lifo mode.
123      *
124      * The algorithms here differ from the versions in the above paper
125      * in ways including:
126      *
127      *  * The original algorithms used bit-marked pointers, but the
128      *     ones here use a bit (isData) in nodes, and usually avoid
129      *     creating nodes when fulfilling. They also use the
130      *     compareAndExchange form of CAS for pointer updates to
131      *     reduce memory traffic.
132      *  * Fifo mode is based on LinkedTransferQueue operations, but
133      *     Lifo mode support is added in subclass Transferer.
134      *  * The Fifo version accommodates lazy updates and slack as
135      *     described in LinkedTransferQueue internal documentation.
136      *  * Threads may block when waiting to become fulfilled,
137      *     sometimes preceded by brief spins.
138      *  * Support for cancellation via timeout and interrupts,
139      *     including cleaning out cancelled nodes/threads from lists
140      *     to avoid garbage retention and memory depletion.
141      */
142 
143     /**
144      * Extension of LinkedTransferQueue to support Lifo (stack) mode.
145      * Methods use the "head" field as head (top) of stack (versus
146      * queue). Note that popped nodes are not self-linked because they
147      * are not prone to unbounded garbage chains. Also note that
148      * "async" mode is never used and not supported for synchronous
149      * transfers.
150      */
151     @SuppressWarnings("serial") // never serialized
152     static final class Transferer<E> extends LinkedTransferQueue<E> {
153 
154         /**
155          * Puts or takes an item with lifo ordering. Loops trying:
156          * * If top (var p) exists and is already matched, pop and continue
157          * * If top has complementary type, try to fulfill by CASing item,
158          *    On success pop (which will succeed unless already helped),
159          *    otherwise restart.
160          * * If no possible match, unless immediate mode, push a
161          *    node and wait, later unsplicing if cancelled.
162          *
163          * @param e the item or null for take
164          * @param ns timeout or 0 if immediate, Long.MAX_VALUE if untimed
165          * @return an item if matched, else e
166          */
xferLifo(Object e, long ns)167         final Object xferLifo(Object e, long ns) {
168             boolean haveData = (e != null);
169             Object m;                              // the match or e if none
170             outer: for (DualNode s = null, p = head;;) {
171                 while (p != null) {
172                     boolean isData; DualNode n, u; // help collapse
173                     if ((isData = p.isData) != ((m = p.item) != null))
174                         p = (p == (u = cmpExHead(p, (n = p.next)))) ? n : u;
175                     else if (isData == haveData)   // same mode; push below
176                         break;
177                     else if (p.cmpExItem(m, e) != m)
178                         p = head;                  // missed; restart
179                     else {                         // matched complementary node
180                         Thread w = p.waiter;
181                         cmpExHead(p, p.next);
182                         LockSupport.unpark(w);
183                         break outer;
184                     }
185                 }
186                 if (ns == 0L) {                    // no match, no wait
187                     m = e;
188                     break;
189                 }
190                 if (s == null)                     // try to push node and wait
191                     s = new DualNode(e, haveData);
192                 s.next = p;
193                 if (p == (p = cmpExHead(p, s))) {
194                     if ((m = s.await(e, ns, this,  // spin if (nearly) empty
195                                      p == null || p.waiter == null)) == e)
196                         unspliceLifo(s);           // cancelled
197                     break;
198                 }
199             }
200             return m;
201         }
202 
203         /**
204          * Unlinks node s. Same idea as Fifo version.
205          */
unspliceLifo(DualNode s)206         private void unspliceLifo(DualNode s) {
207             boolean seen = false; // try removing by collapsing head
208             DualNode p = head;
209             for (DualNode f, u; p != null && p.matched();) {
210                 if (p == s)
211                     seen = true;
212                 p = (p == (u = cmpExHead(p, (f = p.next)))) ? f : u;
213             }
214             if (p != null && !seen && sweepNow()) { // occasionally sweep
215                 for (DualNode f, n, u; p != null && (f = p.next) != null; ) {
216                     p = (!f.matched() ? f :
217                          f == (u = p.cmpExNext(f, n = f.next)) ? n : u);
218                 }
219             }
220         }
221     }
222 
223     /**
224      * The transferer. (See below about serialization.)
225      */
226     private final transient Transferer<E> transferer;
227 
228     private final transient boolean fair;
229 
230     /** Invokes fair or lifo transfer */
xfer(Object e, long nanos)231     private Object xfer(Object e, long nanos) {
232         Transferer<E> x = transferer;
233         return (fair) ? x.xfer(e, nanos) : x.xferLifo(e, nanos);
234     }
235 
236     /**
237      * Creates a {@code SynchronousQueue} with nonfair access policy.
238      */
SynchronousQueue()239     public SynchronousQueue() {
240         this(false);
241     }
242 
243     /**
244      * Creates a {@code SynchronousQueue} with the specified fairness policy.
245      *
246      * @param fair if true, waiting threads contend in FIFO order for
247      *        access; otherwise the order is unspecified.
248      */
SynchronousQueue(boolean fair)249     public SynchronousQueue(boolean fair) {
250         this.fair = fair;
251         transferer = new Transferer<E>();
252     }
253 
254     /**
255      * Adds the specified element to this queue, waiting if necessary for
256      * another thread to receive it.
257      *
258      * @throws InterruptedException {@inheritDoc}
259      * @throws NullPointerException {@inheritDoc}
260      */
put(E e)261     public void put(E e) throws InterruptedException {
262         Objects.requireNonNull(e);
263         if (!Thread.interrupted()) {
264             if (xfer(e, Long.MAX_VALUE) == null)
265                 return;
266             Thread.interrupted(); // failure possible only due to interrupt
267         }
268         throw new InterruptedException();
269     }
270 
271     /**
272      * Inserts the specified element into this queue, waiting if necessary
273      * up to the specified wait time for another thread to receive it.
274      *
275      * @return {@code true} if successful, or {@code false} if the
276      *         specified waiting time elapses before a consumer appears
277      * @throws InterruptedException {@inheritDoc}
278      * @throws NullPointerException {@inheritDoc}
279      */
offer(E e, long timeout, TimeUnit unit)280     public boolean offer(E e, long timeout, TimeUnit unit)
281         throws InterruptedException {
282         Objects.requireNonNull(e);
283         long nanos = Math.max(unit.toNanos(timeout), 0L);
284         if (xfer(e, nanos) == null)
285             return true;
286         if (!Thread.interrupted())
287             return false;
288         throw new InterruptedException();
289     }
290 
291     /**
292      * Inserts the specified element into this queue, if another thread is
293      * waiting to receive it.
294      *
295      * @param e the element to add
296      * @return {@code true} if the element was added to this queue, else
297      *         {@code false}
298      * @throws NullPointerException if the specified element is null
299      */
offer(E e)300     public boolean offer(E e) {
301         Objects.requireNonNull(e);
302         return xfer(e, 0L) == null;
303     }
304 
305     /**
306      * Retrieves and removes the head of this queue, waiting if necessary
307      * for another thread to insert it.
308      *
309      * @return the head of this queue
310      * @throws InterruptedException {@inheritDoc}
311      */
312     @SuppressWarnings("unchecked")
take()313     public E take() throws InterruptedException {
314         Object e;
315         if (!Thread.interrupted()) {
316             if ((e = xfer(null, Long.MAX_VALUE)) != null)
317                 return (E) e;
318             Thread.interrupted();
319         }
320         throw new InterruptedException();
321     }
322 
323     /**
324      * Retrieves and removes the head of this queue, waiting
325      * if necessary up to the specified wait time, for another thread
326      * to insert it.
327      *
328      * @return the head of this queue, or {@code null} if the
329      *         specified waiting time elapses before an element is present
330      * @throws InterruptedException {@inheritDoc}
331      */
332     @SuppressWarnings("unchecked")
poll(long timeout, TimeUnit unit)333     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
334         Object e;
335         long nanos = Math.max(unit.toNanos(timeout), 0L);
336         if ((e = xfer(null, nanos)) != null || !Thread.interrupted())
337             return (E) e;
338         throw new InterruptedException();
339     }
340 
341     /**
342      * Retrieves and removes the head of this queue, if another thread
343      * is currently making an element available.
344      *
345      * @return the head of this queue, or {@code null} if no
346      *         element is available
347      */
348     @SuppressWarnings("unchecked")
poll()349     public E poll() {
350         return (E) xfer(null, 0L);
351     }
352 
353     /**
354      * Always returns {@code true}.
355      * A {@code SynchronousQueue} has no internal capacity.
356      *
357      * @return {@code true}
358      */
isEmpty()359     public boolean isEmpty() {
360         return true;
361     }
362 
363     /**
364      * Always returns zero.
365      * A {@code SynchronousQueue} has no internal capacity.
366      *
367      * @return zero
368      */
size()369     public int size() {
370         return 0;
371     }
372 
373     /**
374      * Always returns zero.
375      * A {@code SynchronousQueue} has no internal capacity.
376      *
377      * @return zero
378      */
remainingCapacity()379     public int remainingCapacity() {
380         return 0;
381     }
382 
383     /**
384      * Does nothing.
385      * A {@code SynchronousQueue} has no internal capacity.
386      */
clear()387     public void clear() {
388     }
389 
390     /**
391      * Always returns {@code false}.
392      * A {@code SynchronousQueue} has no internal capacity.
393      *
394      * @param o the element
395      * @return {@code false}
396      */
contains(Object o)397     public boolean contains(Object o) {
398         return false;
399     }
400 
401     /**
402      * Always returns {@code false}.
403      * A {@code SynchronousQueue} has no internal capacity.
404      *
405      * @param o the element to remove
406      * @return {@code false}
407      */
remove(Object o)408     public boolean remove(Object o) {
409         return false;
410     }
411 
412     /**
413      * Returns {@code false} unless the given collection is empty.
414      * A {@code SynchronousQueue} has no internal capacity.
415      *
416      * @param c the collection
417      * @return {@code false} unless given collection is empty
418      */
containsAll(Collection<?> c)419     public boolean containsAll(Collection<?> c) {
420         return c.isEmpty();
421     }
422 
423     /**
424      * Always returns {@code false}.
425      * A {@code SynchronousQueue} has no internal capacity.
426      *
427      * @param c the collection
428      * @return {@code false}
429      */
removeAll(Collection<?> c)430     public boolean removeAll(Collection<?> c) {
431         return false;
432     }
433 
434     /**
435      * Always returns {@code false}.
436      * A {@code SynchronousQueue} has no internal capacity.
437      *
438      * @param c the collection
439      * @return {@code false}
440      */
retainAll(Collection<?> c)441     public boolean retainAll(Collection<?> c) {
442         return false;
443     }
444 
445     /**
446      * Always returns {@code null}.
447      * A {@code SynchronousQueue} does not return elements
448      * unless actively waited on.
449      *
450      * @return {@code null}
451      */
peek()452     public E peek() {
453         return null;
454     }
455 
456     /**
457      * Returns an empty iterator in which {@code hasNext} always returns
458      * {@code false}.
459      *
460      * @return an empty iterator
461      */
iterator()462     public Iterator<E> iterator() {
463         return Collections.emptyIterator();
464     }
465 
466     /**
467      * Returns an empty spliterator in which calls to
468      * {@link Spliterator#trySplit() trySplit} always return {@code null}.
469      *
470      * @return an empty spliterator
471      * @since 1.8
472      */
spliterator()473     public Spliterator<E> spliterator() {
474         return Spliterators.emptySpliterator();
475     }
476 
477     /**
478      * Returns a zero-length array.
479      * @return a zero-length array
480      */
toArray()481     public Object[] toArray() {
482         return new Object[0];
483     }
484 
485     /**
486      * Sets the zeroth element of the specified array to {@code null}
487      * (if the array has non-zero length) and returns it.
488      *
489      * @param a the array
490      * @return the specified array
491      * @throws NullPointerException if the specified array is null
492      */
toArray(T[] a)493     public <T> T[] toArray(T[] a) {
494         if (a.length > 0)
495             a[0] = null;
496         return a;
497     }
498 
499     /**
500      * Always returns {@code "[]"}.
501      * @return {@code "[]"}
502      */
toString()503     public String toString() {
504         return "[]";
505     }
506 
507     /**
508      * @throws UnsupportedOperationException {@inheritDoc}
509      * @throws ClassCastException            {@inheritDoc}
510      * @throws NullPointerException          {@inheritDoc}
511      * @throws IllegalArgumentException      {@inheritDoc}
512      */
drainTo(Collection<? super E> c)513     public int drainTo(Collection<? super E> c) {
514         Objects.requireNonNull(c);
515         if (c == this)
516             throw new IllegalArgumentException();
517         int n = 0;
518         for (E e; (e = poll()) != null; n++)
519             c.add(e);
520         return n;
521     }
522 
523     /**
524      * @throws UnsupportedOperationException {@inheritDoc}
525      * @throws ClassCastException            {@inheritDoc}
526      * @throws NullPointerException          {@inheritDoc}
527      * @throws IllegalArgumentException      {@inheritDoc}
528      */
drainTo(Collection<? super E> c, int maxElements)529     public int drainTo(Collection<? super E> c, int maxElements) {
530         Objects.requireNonNull(c);
531         if (c == this)
532             throw new IllegalArgumentException();
533         int n = 0;
534         for (E e; n < maxElements && (e = poll()) != null; n++)
535             c.add(e);
536         return n;
537     }
538 
539     /*
540      * To cope with serialization across multiple implementation
541      * overhauls, we declare some unused classes and fields that exist
542      * solely to enable serializability across versions.  These fields
543      * are never used, so are initialized only if this object is ever
544      * serialized. We use readResolve to replace a deserialized queue
545      * with a fresh one. Note that no queue elements are serialized,
546      * since any existing ones are only transient.
547      */
548 
549     @SuppressWarnings("serial")
550     static class WaitQueue implements java.io.Serializable { }
551     static class LifoWaitQueue extends WaitQueue {
552         private static final long serialVersionUID = -3633113410248163686L;
553     }
554     static class FifoWaitQueue extends WaitQueue {
555         private static final long serialVersionUID = -3623113410248163686L;
556     }
557     private ReentrantLock qlock;
558     private WaitQueue waitingProducers;
559     private WaitQueue waitingConsumers;
560 
561     /**
562      * Saves this queue to a stream (that is, serializes it).
563      * @param s the stream
564      * @throws java.io.IOException if an I/O error occurs
565      */
writeObject(java.io.ObjectOutputStream s)566     private void writeObject(java.io.ObjectOutputStream s)
567         throws java.io.IOException {
568         if (fair) {
569             qlock = new ReentrantLock(true);
570             waitingProducers = new FifoWaitQueue();
571             waitingConsumers = new FifoWaitQueue();
572         }
573         else {
574             qlock = new ReentrantLock();
575             waitingProducers = new LifoWaitQueue();
576             waitingConsumers = new LifoWaitQueue();
577         }
578         s.defaultWriteObject();
579     }
580 
581     /**
582      * Replaces a deserialized SynchronousQueue with a fresh one with
583      * the associated fairness
584      */
readResolve()585     private Object readResolve() {
586         return new SynchronousQueue<E>(waitingProducers instanceof FifoWaitQueue);
587     }
588 }
589