1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea, Bill Scherer, and Michael Scott with 32 * assistance from members of JCP JSR-166 Expert Group and released to 33 * the public domain, as explained at 34 * http://creativecommons.org/publicdomain/zero/1.0/ 35 */ 36 37 package java.util.concurrent; 38 39 import java.lang.invoke.MethodHandles; 40 import java.lang.invoke.VarHandle; 41 import java.util.AbstractQueue; 42 import java.util.Collection; 43 import java.util.Collections; 44 import java.util.Iterator; 45 import java.util.Objects; 46 import java.util.Spliterator; 47 import java.util.Spliterators; 48 import java.util.concurrent.locks.LockSupport; 49 import java.util.concurrent.locks.ReentrantLock; 50 import java.util.concurrent.ForkJoinWorkerThread; 51 import java.util.concurrent.LinkedTransferQueue; 52 import java.util.concurrent.TransferQueue; 53 54 /** 55 * A {@linkplain BlockingQueue blocking queue} in which each insert 56 * operation must wait for a corresponding remove operation by another 57 * thread, and vice versa. A synchronous queue does not have any 58 * internal capacity, not even a capacity of one. You cannot 59 * {@code peek} at a synchronous queue because an element is only 60 * present when you try to remove it; you cannot insert an element 61 * (using any method) unless another thread is trying to remove it; 62 * you cannot iterate as there is nothing to iterate. The 63 * <em>head</em> of the queue is the element that the first queued 64 * inserting thread is trying to add to the queue; if there is no such 65 * queued thread then no element is available for removal and 66 * {@code poll()} will return {@code null}. For purposes of other 67 * {@code Collection} methods (for example {@code contains}), a 68 * {@code SynchronousQueue} acts as an empty collection. This queue 69 * does not permit {@code null} elements. 70 * 71 * <p>Synchronous queues are similar to rendezvous channels used in 72 * CSP and Ada. They are well suited for handoff designs, in which an 73 * object running in one thread must sync up with an object running 74 * in another thread in order to hand it some information, event, or 75 * task. 76 * 77 * <p>This class supports an optional fairness policy for ordering 78 * waiting producer and consumer threads. By default, this ordering 79 * is not guaranteed. However, a queue constructed with fairness set 80 * to {@code true} grants threads access in FIFO order. 81 * 82 * <p>This class and its iterator implement all of the <em>optional</em> 83 * methods of the {@link Collection} and {@link Iterator} interfaces. 84 * 85 * <p>This class is a member of the 86 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework"> 87 * Java Collections Framework</a>. 88 * 89 * @since 1.5 90 * @author Doug Lea and Bill Scherer and Michael Scott 91 * @param <E> the type of elements held in this queue 92 */ 93 public class SynchronousQueue<E> extends AbstractQueue<E> 94 implements BlockingQueue<E>, java.io.Serializable { 95 private static final long serialVersionUID = -3223113410248163686L; 96 97 /* 98 * This class implements extensions of the dual stack and dual 99 * queue algorithms described in "Nonblocking Concurrent Objects 100 * with Condition Synchronization", by W. N. Scherer III and 101 * M. L. Scott. 18th Annual Conf. on Distributed Computing, 102 * Oct. 2004 (see also 103 * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html). 104 * The queue is treated as a Lifo stack in non-fair mode, and a 105 * Fifo queue in fair mode. In most contexts, transfer performance 106 * is roughly comparable across them. Lifo is usually faster under 107 * low contention, but slower under high contention. Performance 108 * of applications using them also varies. Lifo is generally 109 * preferable in resource management settings (for example cached 110 * thread pools) because of better temporal locality, but 111 * inappropriate for message-passing applications. 112 * 113 * A dual queue is one that at any given time either holds "data" 114 * -- items provided by put operations, or "requests" -- slots 115 * representing take operations, or is empty. A fulfilling 116 * operation (i.e., a call requesting an item from a queue holding 117 * data or vice versa) "matches" the item of and then dequeues a 118 * complementary node. Any operation can figure out which mode 119 * the queue is in, and act accordingly without needing locks. So 120 * put and take operations are symmetrical, and all transfer 121 * methods invoke a single "xfer" method that does a put or a take 122 * in either fifo or lifo mode. 123 * 124 * The algorithms here differ from the versions in the above paper 125 * in ways including: 126 * 127 * * The original algorithms used bit-marked pointers, but the 128 * ones here use a bit (isData) in nodes, and usually avoid 129 * creating nodes when fulfilling. They also use the 130 * compareAndExchange form of CAS for pointer updates to 131 * reduce memory traffic. 132 * * Fifo mode is based on LinkedTransferQueue operations, but 133 * Lifo mode support is added in subclass Transferer. 134 * * The Fifo version accommodates lazy updates and slack as 135 * described in LinkedTransferQueue internal documentation. 136 * * Threads may block when waiting to become fulfilled, 137 * sometimes preceded by brief spins. 138 * * Support for cancellation via timeout and interrupts, 139 * including cleaning out cancelled nodes/threads from lists 140 * to avoid garbage retention and memory depletion. 141 */ 142 143 /** 144 * Extension of LinkedTransferQueue to support Lifo (stack) mode. 145 * Methods use the "head" field as head (top) of stack (versus 146 * queue). Note that popped nodes are not self-linked because they 147 * are not prone to unbounded garbage chains. Also note that 148 * "async" mode is never used and not supported for synchronous 149 * transfers. 150 */ 151 @SuppressWarnings("serial") // never serialized 152 static final class Transferer<E> extends LinkedTransferQueue<E> { 153 154 /** 155 * Puts or takes an item with lifo ordering. Loops trying: 156 * * If top (var p) exists and is already matched, pop and continue 157 * * If top has complementary type, try to fulfill by CASing item, 158 * On success pop (which will succeed unless already helped), 159 * otherwise restart. 160 * * If no possible match, unless immediate mode, push a 161 * node and wait, later unsplicing if cancelled. 162 * 163 * @param e the item or null for take 164 * @param ns timeout or 0 if immediate, Long.MAX_VALUE if untimed 165 * @return an item if matched, else e 166 */ xferLifo(Object e, long ns)167 final Object xferLifo(Object e, long ns) { 168 boolean haveData = (e != null); 169 Object m; // the match or e if none 170 outer: for (DualNode s = null, p = head;;) { 171 while (p != null) { 172 boolean isData; DualNode n, u; // help collapse 173 if ((isData = p.isData) != ((m = p.item) != null)) 174 p = (p == (u = cmpExHead(p, (n = p.next)))) ? n : u; 175 else if (isData == haveData) // same mode; push below 176 break; 177 else if (p.cmpExItem(m, e) != m) 178 p = head; // missed; restart 179 else { // matched complementary node 180 Thread w = p.waiter; 181 cmpExHead(p, p.next); 182 LockSupport.unpark(w); 183 break outer; 184 } 185 } 186 if (ns == 0L) { // no match, no wait 187 m = e; 188 break; 189 } 190 if (s == null) // try to push node and wait 191 s = new DualNode(e, haveData); 192 s.next = p; 193 if (p == (p = cmpExHead(p, s))) { 194 if ((m = s.await(e, ns, this, // spin if (nearly) empty 195 p == null || p.waiter == null)) == e) 196 unspliceLifo(s); // cancelled 197 break; 198 } 199 } 200 return m; 201 } 202 203 /** 204 * Unlinks node s. Same idea as Fifo version. 205 */ unspliceLifo(DualNode s)206 private void unspliceLifo(DualNode s) { 207 boolean seen = false; // try removing by collapsing head 208 DualNode p = head; 209 for (DualNode f, u; p != null && p.matched();) { 210 if (p == s) 211 seen = true; 212 p = (p == (u = cmpExHead(p, (f = p.next)))) ? f : u; 213 } 214 if (p != null && !seen && sweepNow()) { // occasionally sweep 215 for (DualNode f, n, u; p != null && (f = p.next) != null; ) { 216 p = (!f.matched() ? f : 217 f == (u = p.cmpExNext(f, n = f.next)) ? n : u); 218 } 219 } 220 } 221 } 222 223 /** 224 * The transferer. (See below about serialization.) 225 */ 226 private final transient Transferer<E> transferer; 227 228 private final transient boolean fair; 229 230 /** Invokes fair or lifo transfer */ xfer(Object e, long nanos)231 private Object xfer(Object e, long nanos) { 232 Transferer<E> x = transferer; 233 return (fair) ? x.xfer(e, nanos) : x.xferLifo(e, nanos); 234 } 235 236 /** 237 * Creates a {@code SynchronousQueue} with nonfair access policy. 238 */ SynchronousQueue()239 public SynchronousQueue() { 240 this(false); 241 } 242 243 /** 244 * Creates a {@code SynchronousQueue} with the specified fairness policy. 245 * 246 * @param fair if true, waiting threads contend in FIFO order for 247 * access; otherwise the order is unspecified. 248 */ SynchronousQueue(boolean fair)249 public SynchronousQueue(boolean fair) { 250 this.fair = fair; 251 transferer = new Transferer<E>(); 252 } 253 254 /** 255 * Adds the specified element to this queue, waiting if necessary for 256 * another thread to receive it. 257 * 258 * @throws InterruptedException {@inheritDoc} 259 * @throws NullPointerException {@inheritDoc} 260 */ put(E e)261 public void put(E e) throws InterruptedException { 262 Objects.requireNonNull(e); 263 if (!Thread.interrupted()) { 264 if (xfer(e, Long.MAX_VALUE) == null) 265 return; 266 Thread.interrupted(); // failure possible only due to interrupt 267 } 268 throw new InterruptedException(); 269 } 270 271 /** 272 * Inserts the specified element into this queue, waiting if necessary 273 * up to the specified wait time for another thread to receive it. 274 * 275 * @return {@code true} if successful, or {@code false} if the 276 * specified waiting time elapses before a consumer appears 277 * @throws InterruptedException {@inheritDoc} 278 * @throws NullPointerException {@inheritDoc} 279 */ offer(E e, long timeout, TimeUnit unit)280 public boolean offer(E e, long timeout, TimeUnit unit) 281 throws InterruptedException { 282 Objects.requireNonNull(e); 283 long nanos = Math.max(unit.toNanos(timeout), 0L); 284 if (xfer(e, nanos) == null) 285 return true; 286 if (!Thread.interrupted()) 287 return false; 288 throw new InterruptedException(); 289 } 290 291 /** 292 * Inserts the specified element into this queue, if another thread is 293 * waiting to receive it. 294 * 295 * @param e the element to add 296 * @return {@code true} if the element was added to this queue, else 297 * {@code false} 298 * @throws NullPointerException if the specified element is null 299 */ offer(E e)300 public boolean offer(E e) { 301 Objects.requireNonNull(e); 302 return xfer(e, 0L) == null; 303 } 304 305 /** 306 * Retrieves and removes the head of this queue, waiting if necessary 307 * for another thread to insert it. 308 * 309 * @return the head of this queue 310 * @throws InterruptedException {@inheritDoc} 311 */ 312 @SuppressWarnings("unchecked") take()313 public E take() throws InterruptedException { 314 Object e; 315 if (!Thread.interrupted()) { 316 if ((e = xfer(null, Long.MAX_VALUE)) != null) 317 return (E) e; 318 Thread.interrupted(); 319 } 320 throw new InterruptedException(); 321 } 322 323 /** 324 * Retrieves and removes the head of this queue, waiting 325 * if necessary up to the specified wait time, for another thread 326 * to insert it. 327 * 328 * @return the head of this queue, or {@code null} if the 329 * specified waiting time elapses before an element is present 330 * @throws InterruptedException {@inheritDoc} 331 */ 332 @SuppressWarnings("unchecked") poll(long timeout, TimeUnit unit)333 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 334 Object e; 335 long nanos = Math.max(unit.toNanos(timeout), 0L); 336 if ((e = xfer(null, nanos)) != null || !Thread.interrupted()) 337 return (E) e; 338 throw new InterruptedException(); 339 } 340 341 /** 342 * Retrieves and removes the head of this queue, if another thread 343 * is currently making an element available. 344 * 345 * @return the head of this queue, or {@code null} if no 346 * element is available 347 */ 348 @SuppressWarnings("unchecked") poll()349 public E poll() { 350 return (E) xfer(null, 0L); 351 } 352 353 /** 354 * Always returns {@code true}. 355 * A {@code SynchronousQueue} has no internal capacity. 356 * 357 * @return {@code true} 358 */ isEmpty()359 public boolean isEmpty() { 360 return true; 361 } 362 363 /** 364 * Always returns zero. 365 * A {@code SynchronousQueue} has no internal capacity. 366 * 367 * @return zero 368 */ size()369 public int size() { 370 return 0; 371 } 372 373 /** 374 * Always returns zero. 375 * A {@code SynchronousQueue} has no internal capacity. 376 * 377 * @return zero 378 */ remainingCapacity()379 public int remainingCapacity() { 380 return 0; 381 } 382 383 /** 384 * Does nothing. 385 * A {@code SynchronousQueue} has no internal capacity. 386 */ clear()387 public void clear() { 388 } 389 390 /** 391 * Always returns {@code false}. 392 * A {@code SynchronousQueue} has no internal capacity. 393 * 394 * @param o the element 395 * @return {@code false} 396 */ contains(Object o)397 public boolean contains(Object o) { 398 return false; 399 } 400 401 /** 402 * Always returns {@code false}. 403 * A {@code SynchronousQueue} has no internal capacity. 404 * 405 * @param o the element to remove 406 * @return {@code false} 407 */ remove(Object o)408 public boolean remove(Object o) { 409 return false; 410 } 411 412 /** 413 * Returns {@code false} unless the given collection is empty. 414 * A {@code SynchronousQueue} has no internal capacity. 415 * 416 * @param c the collection 417 * @return {@code false} unless given collection is empty 418 */ containsAll(Collection<?> c)419 public boolean containsAll(Collection<?> c) { 420 return c.isEmpty(); 421 } 422 423 /** 424 * Always returns {@code false}. 425 * A {@code SynchronousQueue} has no internal capacity. 426 * 427 * @param c the collection 428 * @return {@code false} 429 */ removeAll(Collection<?> c)430 public boolean removeAll(Collection<?> c) { 431 return false; 432 } 433 434 /** 435 * Always returns {@code false}. 436 * A {@code SynchronousQueue} has no internal capacity. 437 * 438 * @param c the collection 439 * @return {@code false} 440 */ retainAll(Collection<?> c)441 public boolean retainAll(Collection<?> c) { 442 return false; 443 } 444 445 /** 446 * Always returns {@code null}. 447 * A {@code SynchronousQueue} does not return elements 448 * unless actively waited on. 449 * 450 * @return {@code null} 451 */ peek()452 public E peek() { 453 return null; 454 } 455 456 /** 457 * Returns an empty iterator in which {@code hasNext} always returns 458 * {@code false}. 459 * 460 * @return an empty iterator 461 */ iterator()462 public Iterator<E> iterator() { 463 return Collections.emptyIterator(); 464 } 465 466 /** 467 * Returns an empty spliterator in which calls to 468 * {@link Spliterator#trySplit() trySplit} always return {@code null}. 469 * 470 * @return an empty spliterator 471 * @since 1.8 472 */ spliterator()473 public Spliterator<E> spliterator() { 474 return Spliterators.emptySpliterator(); 475 } 476 477 /** 478 * Returns a zero-length array. 479 * @return a zero-length array 480 */ toArray()481 public Object[] toArray() { 482 return new Object[0]; 483 } 484 485 /** 486 * Sets the zeroth element of the specified array to {@code null} 487 * (if the array has non-zero length) and returns it. 488 * 489 * @param a the array 490 * @return the specified array 491 * @throws NullPointerException if the specified array is null 492 */ toArray(T[] a)493 public <T> T[] toArray(T[] a) { 494 if (a.length > 0) 495 a[0] = null; 496 return a; 497 } 498 499 /** 500 * Always returns {@code "[]"}. 501 * @return {@code "[]"} 502 */ toString()503 public String toString() { 504 return "[]"; 505 } 506 507 /** 508 * @throws UnsupportedOperationException {@inheritDoc} 509 * @throws ClassCastException {@inheritDoc} 510 * @throws NullPointerException {@inheritDoc} 511 * @throws IllegalArgumentException {@inheritDoc} 512 */ drainTo(Collection<? super E> c)513 public int drainTo(Collection<? super E> c) { 514 Objects.requireNonNull(c); 515 if (c == this) 516 throw new IllegalArgumentException(); 517 int n = 0; 518 for (E e; (e = poll()) != null; n++) 519 c.add(e); 520 return n; 521 } 522 523 /** 524 * @throws UnsupportedOperationException {@inheritDoc} 525 * @throws ClassCastException {@inheritDoc} 526 * @throws NullPointerException {@inheritDoc} 527 * @throws IllegalArgumentException {@inheritDoc} 528 */ drainTo(Collection<? super E> c, int maxElements)529 public int drainTo(Collection<? super E> c, int maxElements) { 530 Objects.requireNonNull(c); 531 if (c == this) 532 throw new IllegalArgumentException(); 533 int n = 0; 534 for (E e; n < maxElements && (e = poll()) != null; n++) 535 c.add(e); 536 return n; 537 } 538 539 /* 540 * To cope with serialization across multiple implementation 541 * overhauls, we declare some unused classes and fields that exist 542 * solely to enable serializability across versions. These fields 543 * are never used, so are initialized only if this object is ever 544 * serialized. We use readResolve to replace a deserialized queue 545 * with a fresh one. Note that no queue elements are serialized, 546 * since any existing ones are only transient. 547 */ 548 549 @SuppressWarnings("serial") 550 static class WaitQueue implements java.io.Serializable { } 551 static class LifoWaitQueue extends WaitQueue { 552 private static final long serialVersionUID = -3633113410248163686L; 553 } 554 static class FifoWaitQueue extends WaitQueue { 555 private static final long serialVersionUID = -3623113410248163686L; 556 } 557 private ReentrantLock qlock; 558 private WaitQueue waitingProducers; 559 private WaitQueue waitingConsumers; 560 561 /** 562 * Saves this queue to a stream (that is, serializes it). 563 * @param s the stream 564 * @throws java.io.IOException if an I/O error occurs 565 */ writeObject(java.io.ObjectOutputStream s)566 private void writeObject(java.io.ObjectOutputStream s) 567 throws java.io.IOException { 568 if (fair) { 569 qlock = new ReentrantLock(true); 570 waitingProducers = new FifoWaitQueue(); 571 waitingConsumers = new FifoWaitQueue(); 572 } 573 else { 574 qlock = new ReentrantLock(); 575 waitingProducers = new LifoWaitQueue(); 576 waitingConsumers = new LifoWaitQueue(); 577 } 578 s.defaultWriteObject(); 579 } 580 581 /** 582 * Replaces a deserialized SynchronousQueue with a fresh one with 583 * the associated fairness 584 */ readResolve()585 private Object readResolve() { 586 return new SynchronousQueue<E>(waitingProducers instanceof FifoWaitQueue); 587 } 588 } 589