1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent.locks; 37 38 import java.util.ArrayList; 39 import java.util.Collection; 40 import java.util.Date; 41 import java.util.concurrent.TimeUnit; 42 import java.util.concurrent.ForkJoinPool; 43 import java.util.concurrent.RejectedExecutionException; 44 import jdk.internal.misc.Unsafe; 45 46 /** 47 * A version of {@link AbstractQueuedSynchronizer} in 48 * which synchronization state is maintained as a {@code long}. 49 * This class has exactly the same structure, properties, and methods 50 * as {@code AbstractQueuedSynchronizer} with the exception 51 * that all state-related parameters and results are defined 52 * as {@code long} rather than {@code int}. This class 53 * may be useful when creating synchronizers such as 54 * multilevel locks and barriers that require 55 * 64 bits of state. 56 * 57 * <p>See {@link AbstractQueuedSynchronizer} for usage 58 * notes and examples. 59 * 60 * @since 1.6 61 * @author Doug Lea 62 */ 63 public abstract class AbstractQueuedLongSynchronizer 64 extends AbstractOwnableSynchronizer 65 implements java.io.Serializable { 66 67 private static final long serialVersionUID = 7373984972572414692L; 68 69 /** 70 * Constructor for subclasses to call. 71 */ 72 // Android-changed: Keep the constructor protected. 73 // public AbstractQueuedLongSynchronizer() {} AbstractQueuedLongSynchronizer()74 protected AbstractQueuedLongSynchronizer() {} 75 76 /* 77 * To keep sources in sync, the remainder of this source file is 78 * exactly cloned from AbstractQueuedSynchronizer, replacing class 79 * name and changing ints related with sync state to longs. Please 80 * keep it that way. 81 */ 82 83 // Node status bits, also used as argument and return values 84 static final int WAITING = 1; // must be 1 85 static final int CANCELLED = 0x80000000; // must be negative 86 static final int COND = 2; // in a condition wait 87 88 /** CLH Nodes */ 89 abstract static class Node { 90 volatile Node prev; // initially attached via casTail 91 volatile Node next; // visibly nonnull when signallable 92 Thread waiter; // visibly nonnull when enqueued 93 volatile int status; // written by owner, atomic bit ops by others 94 95 // methods for atomic operations casPrev(Node c, Node v)96 final boolean casPrev(Node c, Node v) { // for cleanQueue 97 return U.weakCompareAndSetReference(this, PREV, c, v); 98 } casNext(Node c, Node v)99 final boolean casNext(Node c, Node v) { // for cleanQueue 100 return U.weakCompareAndSetReference(this, NEXT, c, v); 101 } getAndUnsetStatus(int v)102 final int getAndUnsetStatus(int v) { // for signalling 103 return U.getAndBitwiseAndInt(this, STATUS, ~v); 104 } setPrevRelaxed(Node p)105 final void setPrevRelaxed(Node p) { // for off-queue assignment 106 U.putReference(this, PREV, p); 107 } setStatusRelaxed(int s)108 final void setStatusRelaxed(int s) { // for off-queue assignment 109 U.putInt(this, STATUS, s); 110 } clearStatus()111 final void clearStatus() { // for reducing unneeded signals 112 U.putIntOpaque(this, STATUS, 0); 113 } 114 115 private static final long STATUS 116 = U.objectFieldOffset(Node.class, "status"); 117 private static final long NEXT 118 = U.objectFieldOffset(Node.class, "next"); 119 private static final long PREV 120 = U.objectFieldOffset(Node.class, "prev"); 121 } 122 123 // Concrete classes tagged by type 124 static final class ExclusiveNode extends Node { } 125 static final class SharedNode extends Node { } 126 127 static final class ConditionNode extends Node 128 implements ForkJoinPool.ManagedBlocker { 129 ConditionNode nextWaiter; // link to next waiting node 130 131 /** 132 * Allows Conditions to be used in ForkJoinPools without 133 * risking fixed pool exhaustion. This is usable only for 134 * untimed Condition waits, not timed versions. 135 */ isReleasable()136 public final boolean isReleasable() { 137 return status <= 1 || Thread.currentThread().isInterrupted(); 138 } 139 block()140 public final boolean block() { 141 while (!isReleasable()) LockSupport.park(); 142 return true; 143 } 144 } 145 146 /** 147 * Head of the wait queue, lazily initialized. 148 */ 149 private transient volatile Node head; 150 151 /** 152 * Tail of the wait queue. After initialization, modified only via casTail. 153 */ 154 private transient volatile Node tail; 155 156 /** 157 * The synchronization state. 158 */ 159 private volatile long state; 160 161 /** 162 * Returns the current value of synchronization state. 163 * This operation has memory semantics of a {@code volatile} read. 164 * @return current state value 165 */ getState()166 protected final long getState() { 167 return state; 168 } 169 170 /** 171 * Sets the value of synchronization state. 172 * This operation has memory semantics of a {@code volatile} write. 173 * @param newState the new state value 174 */ setState(long newState)175 protected final void setState(long newState) { 176 state = newState; 177 } 178 179 /** 180 * Atomically sets synchronization state to the given updated 181 * value if the current state value equals the expected value. 182 * This operation has memory semantics of a {@code volatile} read 183 * and write. 184 * 185 * @param expect the expected value 186 * @param update the new value 187 * @return {@code true} if successful. False return indicates that the actual 188 * value was not equal to the expected value. 189 */ compareAndSetState(long expect, long update)190 protected final boolean compareAndSetState(long expect, long update) { 191 return U.compareAndSetLong(this, STATE, expect, update); 192 } 193 194 // Queuing utilities 195 casTail(Node c, Node v)196 private boolean casTail(Node c, Node v) { 197 return U.compareAndSetReference(this, TAIL, c, v); 198 } 199 200 /** 201 * Tries to CAS a new dummy node for head. 202 * Returns new tail, or null if OutOfMemory 203 */ tryInitializeHead()204 private Node tryInitializeHead() { 205 for (Node h = null, t;;) { 206 if ((t = tail) != null) 207 return t; 208 else if (head != null) 209 Thread.onSpinWait(); 210 else { 211 if (h == null) { 212 try { 213 h = new ExclusiveNode(); 214 } catch (OutOfMemoryError oome) { 215 return null; 216 } 217 } 218 if (U.compareAndSetReference(this, HEAD, null, h)) 219 return tail = h; 220 } 221 } 222 } 223 224 225 /** 226 * Enqueues the node unless null. (Currently used only for 227 * ConditionNodes; other cases are interleaved with acquires.) 228 */ enqueue(ConditionNode node)229 final void enqueue(ConditionNode node) { 230 if (node != null) { 231 boolean unpark = false; 232 for (Node t;;) { 233 if ((t = tail) == null && (t = tryInitializeHead()) == null) { 234 unpark = true; // wake up to spin on OOME 235 break; 236 } 237 node.setPrevRelaxed(t); // avoid unnecessary fence 238 if (casTail(t, node)) { 239 t.next = node; 240 if (t.status < 0) // wake up to clean link 241 unpark = true; 242 break; 243 } 244 } 245 if (unpark) 246 LockSupport.unpark(node.waiter); 247 } 248 } 249 250 /** Returns true if node is found in traversal from tail */ isEnqueued(Node node)251 final boolean isEnqueued(Node node) { 252 for (Node t = tail; t != null; t = t.prev) 253 if (t == node) 254 return true; 255 return false; 256 } 257 258 /** 259 * Wakes up the successor of given node, if one exists, and unsets its 260 * WAITING status to avoid park race. This may fail to wake up an 261 * eligible thread when one or more have been cancelled, but 262 * cancelAcquire ensures liveness. 263 */ signalNext(Node h)264 private static void signalNext(Node h) { 265 Node s; 266 if (h != null && (s = h.next) != null && s.status != 0) { 267 s.getAndUnsetStatus(WAITING); 268 LockSupport.unpark(s.waiter); 269 } 270 } 271 272 /** Wakes up the given node if in shared mode */ signalNextIfShared(Node h)273 private static void signalNextIfShared(Node h) { 274 Node s; 275 if (h != null && (s = h.next) != null && 276 (s instanceof SharedNode) && s.status != 0) { 277 s.getAndUnsetStatus(WAITING); 278 LockSupport.unpark(s.waiter); 279 } 280 } 281 282 /** 283 * Main acquire method, invoked by all exported acquire methods. 284 * 285 * @param node null unless a reacquiring Condition 286 * @param arg the acquire argument 287 * @param shared true if shared mode else exclusive 288 * @param interruptible if abort and return negative on interrupt 289 * @param timed if true use timed waits 290 * @param time if timed, the System.nanoTime value to timeout 291 * @return positive if acquired, 0 if timed out, negative if interrupted 292 */ acquire(Node node, long arg, boolean shared, boolean interruptible, boolean timed, long time)293 final int acquire(Node node, long arg, boolean shared, 294 boolean interruptible, boolean timed, long time) { 295 Thread current = Thread.currentThread(); 296 byte spins = 0, postSpins = 0; // retries upon unpark of first thread 297 boolean interrupted = false, first = false; 298 Node pred = null; // predecessor of node when enqueued 299 300 /* 301 * Repeatedly: 302 * Check if node now first 303 * if so, ensure head stable, else ensure valid predecessor 304 * if node is first or not yet enqueued, try acquiring 305 * else if queue is not initialized, do so by attaching new header node 306 * resort to spinwait on OOME trying to create node 307 * else if node not yet created, create it 308 * resort to spinwait on OOME trying to create node 309 * else if not yet enqueued, try once to enqueue 310 * else if woken from park, retry (up to postSpins times) 311 * else if WAITING status not set, set and retry 312 * else park and clear WAITING status, and check cancellation 313 */ 314 315 for (;;) { 316 if (!first && (pred = (node == null) ? null : node.prev) != null && 317 !(first = (head == pred))) { 318 if (pred.status < 0) { 319 cleanQueue(); // predecessor cancelled 320 continue; 321 } else if (pred.prev == null) { 322 Thread.onSpinWait(); // ensure serialization 323 continue; 324 } 325 } 326 if (first || pred == null) { 327 boolean acquired; 328 try { 329 if (shared) 330 acquired = (tryAcquireShared(arg) >= 0); 331 else 332 acquired = tryAcquire(arg); 333 } catch (Throwable ex) { 334 cancelAcquire(node, interrupted, false); 335 throw ex; 336 } 337 if (acquired) { 338 if (first) { 339 node.prev = null; 340 head = node; 341 pred.next = null; 342 node.waiter = null; 343 if (shared) 344 signalNextIfShared(node); 345 if (interrupted) 346 current.interrupt(); 347 } 348 return 1; 349 } 350 } 351 Node t; 352 if ((t = tail) == null) { // initialize queue 353 if (tryInitializeHead() == null) 354 return acquireOnOOME(shared, arg); 355 } else if (node == null) { // allocate; retry before enqueue 356 try { 357 node = (shared) ? new SharedNode() : new ExclusiveNode(); 358 } catch (OutOfMemoryError oome) { 359 return acquireOnOOME(shared, arg); 360 } 361 } else if (pred == null) { // try to enqueue 362 node.waiter = current; 363 node.setPrevRelaxed(t); // avoid unnecessary fence 364 if (!casTail(t, node)) 365 node.setPrevRelaxed(null); // back out 366 else 367 t.next = node; 368 } else if (first && spins != 0) { 369 --spins; // reduce unfairness on rewaits 370 Thread.onSpinWait(); 371 } else if (node.status == 0) { 372 node.status = WAITING; // enable signal and recheck 373 } else { 374 long nanos; 375 spins = postSpins = (byte)((postSpins << 1) | 1); 376 if (!timed) 377 LockSupport.park(this); 378 else if ((nanos = time - System.nanoTime()) > 0L) 379 LockSupport.parkNanos(this, nanos); 380 else 381 break; 382 node.clearStatus(); 383 if ((interrupted |= Thread.interrupted()) && interruptible) 384 break; 385 } 386 } 387 return cancelAcquire(node, interrupted, interruptible); 388 } 389 390 /** 391 * Spin-waits with backoff; used only upon OOME failures during acquire. 392 */ acquireOnOOME(boolean shared, long arg)393 private int acquireOnOOME(boolean shared, long arg) { 394 for (long nanos = 1L;;) { 395 if (shared ? (tryAcquireShared(arg) >= 0) : tryAcquire(arg)) 396 return 1; 397 U.park(false, nanos); // must use Unsafe park to sleep 398 if (nanos < 1L << 30) // max about 1 second 399 nanos <<= 1; 400 } 401 } 402 403 /** 404 * Possibly repeatedly traverses from tail, unsplicing cancelled 405 * nodes until none are found. Unparks nodes that may have been 406 * relinked to be next eligible acquirer. 407 */ cleanQueue()408 private void cleanQueue() { 409 for (;;) { // restart point 410 for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples 411 if (q == null || (p = q.prev) == null) 412 return; // end of list 413 if (s == null ? tail != q : (s.prev != q || s.status < 0)) 414 break; // inconsistent 415 if (q.status < 0) { // cancelled 416 if ((s == null ? casTail(q, p) : s.casPrev(q, p)) && 417 q.prev == p) { 418 p.casNext(q, s); // OK if fails 419 if (p.prev == null) 420 signalNext(p); 421 } 422 break; 423 } 424 if ((n = p.next) != q) { // help finish 425 if (n != null && q.prev == p) { 426 p.casNext(n, q); 427 if (p.prev == null) 428 signalNext(p); 429 } 430 break; 431 } 432 s = q; 433 q = q.prev; 434 } 435 } 436 } 437 438 /** 439 * Cancels an ongoing attempt to acquire. 440 * 441 * @param node the node (may be null if cancelled before enqueuing) 442 * @param interrupted true if thread interrupted 443 * @param interruptible if should report interruption vs reset 444 */ cancelAcquire(Node node, boolean interrupted, boolean interruptible)445 private int cancelAcquire(Node node, boolean interrupted, 446 boolean interruptible) { 447 if (node != null) { 448 node.waiter = null; 449 node.status = CANCELLED; 450 if (node.prev != null) 451 cleanQueue(); 452 } 453 if (interrupted) { 454 if (interruptible) 455 return CANCELLED; 456 else 457 Thread.currentThread().interrupt(); 458 } 459 return 0; 460 } 461 462 // Main exported methods 463 464 /** 465 * Attempts to acquire in exclusive mode. This method should query 466 * if the state of the object permits it to be acquired in the 467 * exclusive mode, and if so to acquire it. 468 * 469 * <p>This method is always invoked by the thread performing 470 * acquire. If this method reports failure, the acquire method 471 * may queue the thread, if it is not already queued, until it is 472 * signalled by a release from some other thread. This can be used 473 * to implement method {@link Lock#tryLock()}. 474 * 475 * <p>The default 476 * implementation throws {@link UnsupportedOperationException}. 477 * 478 * @param arg the acquire argument. This value is always the one 479 * passed to an acquire method, or is the value saved on entry 480 * to a condition wait. The value is otherwise uninterpreted 481 * and can represent anything you like. 482 * @return {@code true} if successful. Upon success, this object has 483 * been acquired. 484 * @throws IllegalMonitorStateException if acquiring would place this 485 * synchronizer in an illegal state. This exception must be 486 * thrown in a consistent fashion for synchronization to work 487 * correctly. 488 * @throws UnsupportedOperationException if exclusive mode is not supported 489 */ tryAcquire(long arg)490 protected boolean tryAcquire(long arg) { 491 throw new UnsupportedOperationException(); 492 } 493 494 /** 495 * Attempts to set the state to reflect a release in exclusive 496 * mode. 497 * 498 * <p>This method is always invoked by the thread performing release. 499 * 500 * <p>The default implementation throws 501 * {@link UnsupportedOperationException}. 502 * 503 * @param arg the release argument. This value is always the one 504 * passed to a release method, or the current state value upon 505 * entry to a condition wait. The value is otherwise 506 * uninterpreted and can represent anything you like. 507 * @return {@code true} if this object is now in a fully released 508 * state, so that any waiting threads may attempt to acquire; 509 * and {@code false} otherwise. 510 * @throws IllegalMonitorStateException if releasing would place this 511 * synchronizer in an illegal state. This exception must be 512 * thrown in a consistent fashion for synchronization to work 513 * correctly. 514 * @throws UnsupportedOperationException if exclusive mode is not supported 515 */ tryRelease(long arg)516 protected boolean tryRelease(long arg) { 517 throw new UnsupportedOperationException(); 518 } 519 520 /** 521 * Attempts to acquire in shared mode. This method should query if 522 * the state of the object permits it to be acquired in the shared 523 * mode, and if so to acquire it. 524 * 525 * <p>This method is always invoked by the thread performing 526 * acquire. If this method reports failure, the acquire method 527 * may queue the thread, if it is not already queued, until it is 528 * signalled by a release from some other thread. 529 * 530 * <p>The default implementation throws {@link 531 * UnsupportedOperationException}. 532 * 533 * @param arg the acquire argument. This value is always the one 534 * passed to an acquire method, or is the value saved on entry 535 * to a condition wait. The value is otherwise uninterpreted 536 * and can represent anything you like. 537 * @return a negative value on failure; zero if acquisition in shared 538 * mode succeeded but no subsequent shared-mode acquire can 539 * succeed; and a positive value if acquisition in shared 540 * mode succeeded and subsequent shared-mode acquires might 541 * also succeed, in which case a subsequent waiting thread 542 * must check availability. (Support for three different 543 * return values enables this method to be used in contexts 544 * where acquires only sometimes act exclusively.) Upon 545 * success, this object has been acquired. 546 * @throws IllegalMonitorStateException if acquiring would place this 547 * synchronizer in an illegal state. This exception must be 548 * thrown in a consistent fashion for synchronization to work 549 * correctly. 550 * @throws UnsupportedOperationException if shared mode is not supported 551 */ tryAcquireShared(long arg)552 protected long tryAcquireShared(long arg) { 553 throw new UnsupportedOperationException(); 554 } 555 556 /** 557 * Attempts to set the state to reflect a release in shared mode. 558 * 559 * <p>This method is always invoked by the thread performing release. 560 * 561 * <p>The default implementation throws 562 * {@link UnsupportedOperationException}. 563 * 564 * @param arg the release argument. This value is always the one 565 * passed to a release method, or the current state value upon 566 * entry to a condition wait. The value is otherwise 567 * uninterpreted and can represent anything you like. 568 * @return {@code true} if this release of shared mode may permit a 569 * waiting acquire (shared or exclusive) to succeed; and 570 * {@code false} otherwise 571 * @throws IllegalMonitorStateException if releasing would place this 572 * synchronizer in an illegal state. This exception must be 573 * thrown in a consistent fashion for synchronization to work 574 * correctly. 575 * @throws UnsupportedOperationException if shared mode is not supported 576 */ tryReleaseShared(long arg)577 protected boolean tryReleaseShared(long arg) { 578 throw new UnsupportedOperationException(); 579 } 580 581 /** 582 * Returns {@code true} if synchronization is held exclusively with 583 * respect to the current (calling) thread. This method is invoked 584 * upon each call to a {@link ConditionObject} method. 585 * 586 * <p>The default implementation throws {@link 587 * UnsupportedOperationException}. This method is invoked 588 * internally only within {@link ConditionObject} methods, so need 589 * not be defined if conditions are not used. 590 * 591 * @return {@code true} if synchronization is held exclusively; 592 * {@code false} otherwise 593 * @throws UnsupportedOperationException if conditions are not supported 594 */ isHeldExclusively()595 protected boolean isHeldExclusively() { 596 throw new UnsupportedOperationException(); 597 } 598 599 /** 600 * Acquires in exclusive mode, ignoring interrupts. Implemented 601 * by invoking at least once {@link #tryAcquire}, 602 * returning on success. Otherwise the thread is queued, possibly 603 * repeatedly blocking and unblocking, invoking {@link 604 * #tryAcquire} until success. This method can be used 605 * to implement method {@link Lock#lock}. 606 * 607 * @param arg the acquire argument. This value is conveyed to 608 * {@link #tryAcquire} but is otherwise uninterpreted and 609 * can represent anything you like. 610 */ acquire(long arg)611 public final void acquire(long arg) { 612 if (!tryAcquire(arg)) 613 acquire(null, arg, false, false, false, 0L); 614 } 615 616 /** 617 * Acquires in exclusive mode, aborting if interrupted. 618 * Implemented by first checking interrupt status, then invoking 619 * at least once {@link #tryAcquire}, returning on 620 * success. Otherwise the thread is queued, possibly repeatedly 621 * blocking and unblocking, invoking {@link #tryAcquire} 622 * until success or the thread is interrupted. This method can be 623 * used to implement method {@link Lock#lockInterruptibly}. 624 * 625 * @param arg the acquire argument. This value is conveyed to 626 * {@link #tryAcquire} but is otherwise uninterpreted and 627 * can represent anything you like. 628 * @throws InterruptedException if the current thread is interrupted 629 */ acquireInterruptibly(long arg)630 public final void acquireInterruptibly(long arg) 631 throws InterruptedException { 632 if (Thread.interrupted() || 633 (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0)) 634 throw new InterruptedException(); 635 } 636 637 /** 638 * Attempts to acquire in exclusive mode, aborting if interrupted, 639 * and failing if the given timeout elapses. Implemented by first 640 * checking interrupt status, then invoking at least once {@link 641 * #tryAcquire}, returning on success. Otherwise, the thread is 642 * queued, possibly repeatedly blocking and unblocking, invoking 643 * {@link #tryAcquire} until success or the thread is interrupted 644 * or the timeout elapses. This method can be used to implement 645 * method {@link Lock#tryLock(long, TimeUnit)}. 646 * 647 * @param arg the acquire argument. This value is conveyed to 648 * {@link #tryAcquire} but is otherwise uninterpreted and 649 * can represent anything you like. 650 * @param nanosTimeout the maximum number of nanoseconds to wait 651 * @return {@code true} if acquired; {@code false} if timed out 652 * @throws InterruptedException if the current thread is interrupted 653 */ tryAcquireNanos(long arg, long nanosTimeout)654 public final boolean tryAcquireNanos(long arg, long nanosTimeout) 655 throws InterruptedException { 656 if (!Thread.interrupted()) { 657 if (tryAcquire(arg)) 658 return true; 659 if (nanosTimeout <= 0L) 660 return false; 661 int stat = acquire(null, arg, false, true, true, 662 System.nanoTime() + nanosTimeout); 663 if (stat > 0) 664 return true; 665 if (stat == 0) 666 return false; 667 } 668 throw new InterruptedException(); 669 } 670 671 /** 672 * Releases in exclusive mode. Implemented by unblocking one or 673 * more threads if {@link #tryRelease} returns true. 674 * This method can be used to implement method {@link Lock#unlock}. 675 * 676 * @param arg the release argument. This value is conveyed to 677 * {@link #tryRelease} but is otherwise uninterpreted and 678 * can represent anything you like. 679 * @return the value returned from {@link #tryRelease} 680 */ release(long arg)681 public final boolean release(long arg) { 682 if (tryRelease(arg)) { 683 signalNext(head); 684 return true; 685 } 686 return false; 687 } 688 689 /** 690 * Acquires in shared mode, ignoring interrupts. Implemented by 691 * first invoking at least once {@link #tryAcquireShared}, 692 * returning on success. Otherwise the thread is queued, possibly 693 * repeatedly blocking and unblocking, invoking {@link 694 * #tryAcquireShared} until success. 695 * 696 * @param arg the acquire argument. This value is conveyed to 697 * {@link #tryAcquireShared} but is otherwise uninterpreted 698 * and can represent anything you like. 699 */ acquireShared(long arg)700 public final void acquireShared(long arg) { 701 if (tryAcquireShared(arg) < 0) 702 acquire(null, arg, true, false, false, 0L); 703 } 704 705 /** 706 * Acquires in shared mode, aborting if interrupted. Implemented 707 * by first checking interrupt status, then invoking at least once 708 * {@link #tryAcquireShared}, returning on success. Otherwise the 709 * thread is queued, possibly repeatedly blocking and unblocking, 710 * invoking {@link #tryAcquireShared} until success or the thread 711 * is interrupted. 712 * @param arg the acquire argument. 713 * This value is conveyed to {@link #tryAcquireShared} but is 714 * otherwise uninterpreted and can represent anything 715 * you like. 716 * @throws InterruptedException if the current thread is interrupted 717 */ acquireSharedInterruptibly(long arg)718 public final void acquireSharedInterruptibly(long arg) 719 throws InterruptedException { 720 if (Thread.interrupted() || 721 (tryAcquireShared(arg) < 0 && 722 acquire(null, arg, true, true, false, 0L) < 0)) 723 throw new InterruptedException(); 724 } 725 726 /** 727 * Attempts to acquire in shared mode, aborting if interrupted, and 728 * failing if the given timeout elapses. Implemented by first 729 * checking interrupt status, then invoking at least once {@link 730 * #tryAcquireShared}, returning on success. Otherwise, the 731 * thread is queued, possibly repeatedly blocking and unblocking, 732 * invoking {@link #tryAcquireShared} until success or the thread 733 * is interrupted or the timeout elapses. 734 * 735 * @param arg the acquire argument. This value is conveyed to 736 * {@link #tryAcquireShared} but is otherwise uninterpreted 737 * and can represent anything you like. 738 * @param nanosTimeout the maximum number of nanoseconds to wait 739 * @return {@code true} if acquired; {@code false} if timed out 740 * @throws InterruptedException if the current thread is interrupted 741 */ tryAcquireSharedNanos(long arg, long nanosTimeout)742 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) 743 throws InterruptedException { 744 if (!Thread.interrupted()) { 745 if (tryAcquireShared(arg) >= 0) 746 return true; 747 if (nanosTimeout <= 0L) 748 return false; 749 int stat = acquire(null, arg, true, true, true, 750 System.nanoTime() + nanosTimeout); 751 if (stat > 0) 752 return true; 753 if (stat == 0) 754 return false; 755 } 756 throw new InterruptedException(); 757 } 758 759 /** 760 * Releases in shared mode. Implemented by unblocking one or more 761 * threads if {@link #tryReleaseShared} returns true. 762 * 763 * @param arg the release argument. This value is conveyed to 764 * {@link #tryReleaseShared} but is otherwise uninterpreted 765 * and can represent anything you like. 766 * @return the value returned from {@link #tryReleaseShared} 767 */ releaseShared(long arg)768 public final boolean releaseShared(long arg) { 769 if (tryReleaseShared(arg)) { 770 signalNext(head); 771 return true; 772 } 773 return false; 774 } 775 776 // Queue inspection methods 777 778 /** 779 * Queries whether any threads are waiting to acquire. Note that 780 * because cancellations due to interrupts and timeouts may occur 781 * at any time, a {@code true} return does not guarantee that any 782 * other thread will ever acquire. 783 * 784 * @return {@code true} if there may be other threads waiting to acquire 785 */ hasQueuedThreads()786 public final boolean hasQueuedThreads() { 787 for (Node p = tail, h = head; p != h && p != null; p = p.prev) 788 if (p.status >= 0) 789 return true; 790 return false; 791 } 792 793 /** 794 * Queries whether any threads have ever contended to acquire this 795 * synchronizer; that is, if an acquire method has ever blocked. 796 * 797 * <p>In this implementation, this operation returns in 798 * constant time. 799 * 800 * @return {@code true} if there has ever been contention 801 */ hasContended()802 public final boolean hasContended() { 803 return head != null; 804 } 805 806 /** 807 * Returns the first (longest-waiting) thread in the queue, or 808 * {@code null} if no threads are currently queued. 809 * 810 * <p>In this implementation, this operation normally returns in 811 * constant time, but may iterate upon contention if other threads are 812 * concurrently modifying the queue. 813 * 814 * @return the first (longest-waiting) thread in the queue, or 815 * {@code null} if no threads are currently queued 816 */ getFirstQueuedThread()817 public final Thread getFirstQueuedThread() { 818 Thread first = null, w; Node h, s; 819 if ((h = head) != null && ((s = h.next) == null || 820 (first = s.waiter) == null || 821 s.prev == null)) { 822 // traverse from tail on stale reads 823 for (Node p = tail, q; p != null && (q = p.prev) != null; p = q) 824 if ((w = p.waiter) != null) 825 first = w; 826 } 827 return first; 828 } 829 830 /** 831 * Returns true if the given thread is currently queued. 832 * 833 * <p>This implementation traverses the queue to determine 834 * presence of the given thread. 835 * 836 * @param thread the thread 837 * @return {@code true} if the given thread is on the queue 838 * @throws NullPointerException if the thread is null 839 */ isQueued(Thread thread)840 public final boolean isQueued(Thread thread) { 841 if (thread == null) 842 throw new NullPointerException(); 843 for (Node p = tail; p != null; p = p.prev) 844 if (p.waiter == thread) 845 return true; 846 return false; 847 } 848 849 /** 850 * Returns {@code true} if the apparent first queued thread, if one 851 * exists, is waiting in exclusive mode. If this method returns 852 * {@code true}, and the current thread is attempting to acquire in 853 * shared mode (that is, this method is invoked from {@link 854 * #tryAcquireShared}) then it is guaranteed that the current thread 855 * is not the first queued thread. Used only as a heuristic in 856 * ReentrantReadWriteLock. 857 */ apparentlyFirstQueuedIsExclusive()858 final boolean apparentlyFirstQueuedIsExclusive() { 859 Node h, s; 860 return (h = head) != null && (s = h.next) != null && 861 !(s instanceof SharedNode) && s.waiter != null; 862 } 863 864 /** 865 * Queries whether any threads have been waiting to acquire longer 866 * than the current thread. 867 * 868 * <p>An invocation of this method is equivalent to (but may be 869 * more efficient than): 870 * <pre> {@code 871 * getFirstQueuedThread() != Thread.currentThread() 872 * && hasQueuedThreads()}</pre> 873 * 874 * <p>Note that because cancellations due to interrupts and 875 * timeouts may occur at any time, a {@code true} return does not 876 * guarantee that some other thread will acquire before the current 877 * thread. Likewise, it is possible for another thread to win a 878 * race to enqueue after this method has returned {@code false}, 879 * due to the queue being empty. 880 * 881 * <p>This method is designed to be used by a fair synchronizer to 882 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>. 883 * Such a synchronizer's {@link #tryAcquire} method should return 884 * {@code false}, and its {@link #tryAcquireShared} method should 885 * return a negative value, if this method returns {@code true} 886 * (unless this is a reentrant acquire). For example, the {@code 887 * tryAcquire} method for a fair, reentrant, exclusive mode 888 * synchronizer might look like this: 889 * 890 * <pre> {@code 891 * protected boolean tryAcquire(long arg) { 892 * if (isHeldExclusively()) { 893 * // A reentrant acquire; increment hold count 894 * return true; 895 * } else if (hasQueuedPredecessors()) { 896 * return false; 897 * } else { 898 * // try to acquire normally 899 * } 900 * }}</pre> 901 * 902 * @return {@code true} if there is a queued thread preceding the 903 * current thread, and {@code false} if the current thread 904 * is at the head of the queue or the queue is empty 905 * @since 1.7 906 */ hasQueuedPredecessors()907 public final boolean hasQueuedPredecessors() { 908 Thread first = null; Node h, s; 909 if ((h = head) != null && ((s = h.next) == null || 910 (first = s.waiter) == null || 911 s.prev == null)) 912 first = getFirstQueuedThread(); // retry via getFirstQueuedThread 913 return first != null && first != Thread.currentThread(); 914 } 915 916 // Instrumentation and monitoring methods 917 918 /** 919 * Returns an estimate of the number of threads waiting to 920 * acquire. The value is only an estimate because the number of 921 * threads may change dynamically while this method traverses 922 * internal data structures. This method is designed for use in 923 * monitoring system state, not for synchronization control. 924 * 925 * @return the estimated number of threads waiting to acquire 926 */ getQueueLength()927 public final int getQueueLength() { 928 int n = 0; 929 for (Node p = tail; p != null; p = p.prev) { 930 if (p.waiter != null) 931 ++n; 932 } 933 return n; 934 } 935 936 /** 937 * Returns a collection containing threads that may be waiting to 938 * acquire. Because the actual set of threads may change 939 * dynamically while constructing this result, the returned 940 * collection is only a best-effort estimate. The elements of the 941 * returned collection are in no particular order. This method is 942 * designed to facilitate construction of subclasses that provide 943 * more extensive monitoring facilities. 944 * 945 * @return the collection of threads 946 */ getQueuedThreads()947 public final Collection<Thread> getQueuedThreads() { 948 ArrayList<Thread> list = new ArrayList<>(); 949 for (Node p = tail; p != null; p = p.prev) { 950 Thread t = p.waiter; 951 if (t != null) 952 list.add(t); 953 } 954 return list; 955 } 956 957 /** 958 * Returns a collection containing threads that may be waiting to 959 * acquire in exclusive mode. This has the same properties 960 * as {@link #getQueuedThreads} except that it only returns 961 * those threads waiting due to an exclusive acquire. 962 * 963 * @return the collection of threads 964 */ getExclusiveQueuedThreads()965 public final Collection<Thread> getExclusiveQueuedThreads() { 966 ArrayList<Thread> list = new ArrayList<>(); 967 for (Node p = tail; p != null; p = p.prev) { 968 if (!(p instanceof SharedNode)) { 969 Thread t = p.waiter; 970 if (t != null) 971 list.add(t); 972 } 973 } 974 return list; 975 } 976 977 /** 978 * Returns a collection containing threads that may be waiting to 979 * acquire in shared mode. This has the same properties 980 * as {@link #getQueuedThreads} except that it only returns 981 * those threads waiting due to a shared acquire. 982 * 983 * @return the collection of threads 984 */ getSharedQueuedThreads()985 public final Collection<Thread> getSharedQueuedThreads() { 986 ArrayList<Thread> list = new ArrayList<>(); 987 for (Node p = tail; p != null; p = p.prev) { 988 if (p instanceof SharedNode) { 989 Thread t = p.waiter; 990 if (t != null) 991 list.add(t); 992 } 993 } 994 return list; 995 } 996 997 /** 998 * Returns a string identifying this synchronizer, as well as its state. 999 * The state, in brackets, includes the String {@code "State ="} 1000 * followed by the current value of {@link #getState}, and either 1001 * {@code "nonempty"} or {@code "empty"} depending on whether the 1002 * queue is empty. 1003 * 1004 * @return a string identifying this synchronizer, as well as its state 1005 */ toString()1006 public String toString() { 1007 return super.toString() 1008 + "[State = " + getState() + ", " 1009 + (hasQueuedThreads() ? "non" : "") + "empty queue]"; 1010 } 1011 1012 // Instrumentation methods for conditions 1013 1014 /** 1015 * Queries whether the given ConditionObject 1016 * uses this synchronizer as its lock. 1017 * 1018 * @param condition the condition 1019 * @return {@code true} if owned 1020 * @throws NullPointerException if the condition is null 1021 */ owns(ConditionObject condition)1022 public final boolean owns(ConditionObject condition) { 1023 return condition.isOwnedBy(this); 1024 } 1025 1026 /** 1027 * Queries whether any threads are waiting on the given condition 1028 * associated with this synchronizer. Note that because timeouts 1029 * and interrupts may occur at any time, a {@code true} return 1030 * does not guarantee that a future {@code signal} will awaken 1031 * any threads. This method is designed primarily for use in 1032 * monitoring of the system state. 1033 * 1034 * @param condition the condition 1035 * @return {@code true} if there are any waiting threads 1036 * @throws IllegalMonitorStateException if exclusive synchronization 1037 * is not held 1038 * @throws IllegalArgumentException if the given condition is 1039 * not associated with this synchronizer 1040 * @throws NullPointerException if the condition is null 1041 */ hasWaiters(ConditionObject condition)1042 public final boolean hasWaiters(ConditionObject condition) { 1043 if (!owns(condition)) 1044 throw new IllegalArgumentException("Not owner"); 1045 return condition.hasWaiters(); 1046 } 1047 1048 /** 1049 * Returns an estimate of the number of threads waiting on the 1050 * given condition associated with this synchronizer. Note that 1051 * because timeouts and interrupts may occur at any time, the 1052 * estimate serves only as an upper bound on the actual number of 1053 * waiters. This method is designed for use in monitoring system 1054 * state, not for synchronization control. 1055 * 1056 * @param condition the condition 1057 * @return the estimated number of waiting threads 1058 * @throws IllegalMonitorStateException if exclusive synchronization 1059 * is not held 1060 * @throws IllegalArgumentException if the given condition is 1061 * not associated with this synchronizer 1062 * @throws NullPointerException if the condition is null 1063 */ getWaitQueueLength(ConditionObject condition)1064 public final int getWaitQueueLength(ConditionObject condition) { 1065 if (!owns(condition)) 1066 throw new IllegalArgumentException("Not owner"); 1067 return condition.getWaitQueueLength(); 1068 } 1069 1070 /** 1071 * Returns a collection containing those threads that may be 1072 * waiting on the given condition associated with this 1073 * synchronizer. Because the actual set of threads may change 1074 * dynamically while constructing this result, the returned 1075 * collection is only a best-effort estimate. The elements of the 1076 * returned collection are in no particular order. 1077 * 1078 * @param condition the condition 1079 * @return the collection of threads 1080 * @throws IllegalMonitorStateException if exclusive synchronization 1081 * is not held 1082 * @throws IllegalArgumentException if the given condition is 1083 * not associated with this synchronizer 1084 * @throws NullPointerException if the condition is null 1085 */ getWaitingThreads(ConditionObject condition)1086 public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1087 if (!owns(condition)) 1088 throw new IllegalArgumentException("Not owner"); 1089 return condition.getWaitingThreads(); 1090 } 1091 1092 /** 1093 * Condition implementation for a {@link AbstractQueuedLongSynchronizer} 1094 * serving as the basis of a {@link Lock} implementation. 1095 * 1096 * <p>Method documentation for this class describes mechanics, 1097 * not behavioral specifications from the point of view of Lock 1098 * and Condition users. Exported versions of this class will in 1099 * general need to be accompanied by documentation describing 1100 * condition semantics that rely on those of the associated 1101 * {@code AbstractQueuedLongSynchronizer}. 1102 * 1103 * <p>This class is Serializable, but all fields are transient, 1104 * so deserialized conditions have no waiters. 1105 */ 1106 public class ConditionObject implements Condition, java.io.Serializable { 1107 private static final long serialVersionUID = 1173984872572414699L; 1108 /** First node of condition queue. */ 1109 private transient ConditionNode firstWaiter; 1110 /** Last node of condition queue. */ 1111 private transient ConditionNode lastWaiter; 1112 1113 /** 1114 * Fixed delay in nanoseconds between releasing and reacquiring 1115 * lock during Condition waits that encounter OutOfMemoryErrors 1116 */ 1117 static final long OOME_COND_WAIT_DELAY = 10L * 1000L * 1000L; // 10 ms 1118 1119 /** 1120 * Creates a new {@code ConditionObject} instance. 1121 */ ConditionObject()1122 public ConditionObject() { } 1123 1124 // Signalling methods 1125 1126 /** 1127 * Removes and transfers one or all waiters to sync queue. 1128 */ doSignal(ConditionNode first, boolean all)1129 private void doSignal(ConditionNode first, boolean all) { 1130 while (first != null) { 1131 ConditionNode next = first.nextWaiter; 1132 if ((firstWaiter = next) == null) 1133 lastWaiter = null; 1134 if ((first.getAndUnsetStatus(COND) & COND) != 0) { 1135 enqueue(first); 1136 if (!all) 1137 break; 1138 } 1139 first = next; 1140 } 1141 } 1142 1143 /** 1144 * Moves the longest-waiting thread, if one exists, from the 1145 * wait queue for this condition to the wait queue for the 1146 * owning lock. 1147 * 1148 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1149 * returns {@code false} 1150 */ signal()1151 public final void signal() { 1152 ConditionNode first = firstWaiter; 1153 if (!isHeldExclusively()) 1154 throw new IllegalMonitorStateException(); 1155 else if (first != null) 1156 doSignal(first, false); 1157 } 1158 1159 /** 1160 * Moves all threads from the wait queue for this condition to 1161 * the wait queue for the owning lock. 1162 * 1163 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1164 * returns {@code false} 1165 */ signalAll()1166 public final void signalAll() { 1167 ConditionNode first = firstWaiter; 1168 if (!isHeldExclusively()) 1169 throw new IllegalMonitorStateException(); 1170 else if (first != null) 1171 doSignal(first, true); 1172 } 1173 1174 // Waiting methods 1175 1176 /** 1177 * Adds node to condition list and releases lock. 1178 * 1179 * @param node the node 1180 * @return savedState to reacquire after wait 1181 */ enableWait(ConditionNode node)1182 private long enableWait(ConditionNode node) { 1183 if (isHeldExclusively()) { 1184 node.waiter = Thread.currentThread(); 1185 node.setStatusRelaxed(COND | WAITING); 1186 ConditionNode last = lastWaiter; 1187 if (last == null) 1188 firstWaiter = node; 1189 else 1190 last.nextWaiter = node; 1191 lastWaiter = node; 1192 long savedState = getState(); 1193 if (release(savedState)) 1194 return savedState; 1195 } 1196 node.status = CANCELLED; // lock not held or inconsistent 1197 throw new IllegalMonitorStateException(); 1198 } 1199 1200 /** 1201 * Returns true if a node that was initially placed on a condition 1202 * queue is now ready to reacquire on sync queue. 1203 * @param node the node 1204 * @return true if is reacquiring 1205 */ canReacquire(ConditionNode node)1206 private boolean canReacquire(ConditionNode node) { 1207 // check links, not status to avoid enqueue race 1208 Node p; // traverse unless known to be bidirectionally linked 1209 return node != null && (p = node.prev) != null && 1210 (p.next == node || isEnqueued(node)); 1211 } 1212 1213 /** 1214 * Unlinks the given node and other non-waiting nodes from 1215 * condition queue unless already unlinked. 1216 */ unlinkCancelledWaiters(ConditionNode node)1217 private void unlinkCancelledWaiters(ConditionNode node) { 1218 if (node == null || node.nextWaiter != null || node == lastWaiter) { 1219 ConditionNode w = firstWaiter, trail = null; 1220 while (w != null) { 1221 ConditionNode next = w.nextWaiter; 1222 if ((w.status & COND) == 0) { 1223 w.nextWaiter = null; 1224 if (trail == null) 1225 firstWaiter = next; 1226 else 1227 trail.nextWaiter = next; 1228 if (next == null) 1229 lastWaiter = trail; 1230 } else 1231 trail = w; 1232 w = next; 1233 } 1234 } 1235 } 1236 1237 /** 1238 * Constructs objects needed for condition wait. On OOME, 1239 * releases lock, sleeps, reacquires, and returns null. 1240 */ newConditionNode()1241 private ConditionNode newConditionNode() { 1242 long savedState; 1243 if (tryInitializeHead() != null) { 1244 try { 1245 return new ConditionNode(); 1246 } catch (OutOfMemoryError oome) { 1247 } 1248 } 1249 // fall through if encountered OutOfMemoryError 1250 if (!isHeldExclusively() || !release(savedState = getState())) 1251 throw new IllegalMonitorStateException(); 1252 U.park(false, OOME_COND_WAIT_DELAY); 1253 acquireOnOOME(false, savedState); 1254 return null; 1255 } 1256 1257 /** 1258 * Implements uninterruptible condition wait. 1259 * <ol> 1260 * <li>Save lock state returned by {@link #getState}. 1261 * <li>Invoke {@link #release} with saved state as argument, 1262 * throwing IllegalMonitorStateException if it fails. 1263 * <li>Block until signalled. 1264 * <li>Reacquire by invoking specialized version of 1265 * {@link #acquire} with saved state as argument. 1266 * </ol> 1267 */ awaitUninterruptibly()1268 public final void awaitUninterruptibly() { 1269 ConditionNode node = newConditionNode(); 1270 if (node == null) 1271 return; 1272 long savedState = enableWait(node); 1273 LockSupport.setCurrentBlocker(this); // for back-compatibility 1274 boolean interrupted = false, rejected = false; 1275 while (!canReacquire(node)) { 1276 if (Thread.interrupted()) 1277 interrupted = true; 1278 else if ((node.status & COND) != 0) { 1279 try { 1280 if (rejected) 1281 node.block(); 1282 else 1283 ForkJoinPool.managedBlock(node); 1284 } catch (RejectedExecutionException ex) { 1285 rejected = true; 1286 } catch (InterruptedException ie) { 1287 interrupted = true; 1288 } 1289 } else 1290 Thread.onSpinWait(); // awoke while enqueuing 1291 } 1292 LockSupport.setCurrentBlocker(null); 1293 node.clearStatus(); 1294 acquire(node, savedState, false, false, false, 0L); 1295 if (interrupted) 1296 Thread.currentThread().interrupt(); 1297 } 1298 1299 /** 1300 * Implements interruptible condition wait. 1301 * <ol> 1302 * <li>If current thread is interrupted, throw InterruptedException. 1303 * <li>Save lock state returned by {@link #getState}. 1304 * <li>Invoke {@link #release} with saved state as argument, 1305 * throwing IllegalMonitorStateException if it fails. 1306 * <li>Block until signalled or interrupted. 1307 * <li>Reacquire by invoking specialized version of 1308 * {@link #acquire} with saved state as argument. 1309 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1310 * </ol> 1311 */ await()1312 public final void await() throws InterruptedException { 1313 if (Thread.interrupted()) 1314 throw new InterruptedException(); 1315 ConditionNode node = newConditionNode(); 1316 if (node == null) 1317 return; 1318 long savedState = enableWait(node); 1319 LockSupport.setCurrentBlocker(this); // for back-compatibility 1320 boolean interrupted = false, cancelled = false, rejected = false; 1321 while (!canReacquire(node)) { 1322 if (interrupted |= Thread.interrupted()) { 1323 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1324 break; // else interrupted after signal 1325 } else if ((node.status & COND) != 0) { 1326 try { 1327 if (rejected) 1328 node.block(); 1329 else 1330 ForkJoinPool.managedBlock(node); 1331 } catch (RejectedExecutionException ex) { 1332 rejected = true; 1333 } catch (InterruptedException ie) { 1334 interrupted = true; 1335 } 1336 } else 1337 Thread.onSpinWait(); // awoke while enqueuing 1338 } 1339 LockSupport.setCurrentBlocker(null); 1340 node.clearStatus(); 1341 acquire(node, savedState, false, false, false, 0L); 1342 if (interrupted) { 1343 if (cancelled) { 1344 unlinkCancelledWaiters(node); 1345 throw new InterruptedException(); 1346 } 1347 Thread.currentThread().interrupt(); 1348 } 1349 } 1350 1351 /** 1352 * Implements timed condition wait. 1353 * <ol> 1354 * <li>If current thread is interrupted, throw InterruptedException. 1355 * <li>Save lock state returned by {@link #getState}. 1356 * <li>Invoke {@link #release} with saved state as argument, 1357 * throwing IllegalMonitorStateException if it fails. 1358 * <li>Block until signalled, interrupted, or timed out. 1359 * <li>Reacquire by invoking specialized version of 1360 * {@link #acquire} with saved state as argument. 1361 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1362 * </ol> 1363 */ awaitNanos(long nanosTimeout)1364 public final long awaitNanos(long nanosTimeout) 1365 throws InterruptedException { 1366 if (Thread.interrupted()) 1367 throw new InterruptedException(); 1368 ConditionNode node = newConditionNode(); 1369 if (node == null) 1370 return nanosTimeout - OOME_COND_WAIT_DELAY; 1371 long savedState = enableWait(node); 1372 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; 1373 long deadline = System.nanoTime() + nanos; 1374 boolean cancelled = false, interrupted = false; 1375 while (!canReacquire(node)) { 1376 if ((interrupted |= Thread.interrupted()) || 1377 (nanos = deadline - System.nanoTime()) <= 0L) { 1378 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1379 break; 1380 } else 1381 LockSupport.parkNanos(this, nanos); 1382 } 1383 node.clearStatus(); 1384 acquire(node, savedState, false, false, false, 0L); 1385 if (cancelled) { 1386 unlinkCancelledWaiters(node); 1387 if (interrupted) 1388 throw new InterruptedException(); 1389 } else if (interrupted) 1390 Thread.currentThread().interrupt(); 1391 long remaining = deadline - System.nanoTime(); // avoid overflow 1392 return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE; 1393 } 1394 1395 /** 1396 * Implements absolute timed condition wait. 1397 * <ol> 1398 * <li>If current thread is interrupted, throw InterruptedException. 1399 * <li>Save lock state returned by {@link #getState}. 1400 * <li>Invoke {@link #release} with saved state as argument, 1401 * throwing IllegalMonitorStateException if it fails. 1402 * <li>Block until signalled, interrupted, or timed out. 1403 * <li>Reacquire by invoking specialized version of 1404 * {@link #acquire} with saved state as argument. 1405 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1406 * <li>If timed out while blocked in step 4, return false, else true. 1407 * </ol> 1408 */ awaitUntil(Date deadline)1409 public final boolean awaitUntil(Date deadline) 1410 throws InterruptedException { 1411 long abstime = deadline.getTime(); 1412 if (Thread.interrupted()) 1413 throw new InterruptedException(); 1414 ConditionNode node = newConditionNode(); 1415 if (node == null) 1416 return false; 1417 long savedState = enableWait(node); 1418 boolean cancelled = false, interrupted = false; 1419 while (!canReacquire(node)) { 1420 if ((interrupted |= Thread.interrupted()) || 1421 System.currentTimeMillis() >= abstime) { 1422 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1423 break; 1424 } else 1425 LockSupport.parkUntil(this, abstime); 1426 } 1427 node.clearStatus(); 1428 acquire(node, savedState, false, false, false, 0L); 1429 if (cancelled) { 1430 unlinkCancelledWaiters(node); 1431 if (interrupted) 1432 throw new InterruptedException(); 1433 } else if (interrupted) 1434 Thread.currentThread().interrupt(); 1435 return !cancelled; 1436 } 1437 1438 /** 1439 * Implements timed condition wait. 1440 * <ol> 1441 * <li>If current thread is interrupted, throw InterruptedException. 1442 * <li>Save lock state returned by {@link #getState}. 1443 * <li>Invoke {@link #release} with saved state as argument, 1444 * throwing IllegalMonitorStateException if it fails. 1445 * <li>Block until signalled, interrupted, or timed out. 1446 * <li>Reacquire by invoking specialized version of 1447 * {@link #acquire} with saved state as argument. 1448 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1449 * <li>If timed out while blocked in step 4, return false, else true. 1450 * </ol> 1451 */ await(long time, TimeUnit unit)1452 public final boolean await(long time, TimeUnit unit) 1453 throws InterruptedException { 1454 long nanosTimeout = unit.toNanos(time); 1455 if (Thread.interrupted()) 1456 throw new InterruptedException(); 1457 ConditionNode node = newConditionNode(); 1458 if (node == null) 1459 return false; 1460 long savedState = enableWait(node); 1461 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; 1462 long deadline = System.nanoTime() + nanos; 1463 boolean cancelled = false, interrupted = false; 1464 while (!canReacquire(node)) { 1465 if ((interrupted |= Thread.interrupted()) || 1466 (nanos = deadline - System.nanoTime()) <= 0L) { 1467 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1468 break; 1469 } else 1470 LockSupport.parkNanos(this, nanos); 1471 } 1472 node.clearStatus(); 1473 acquire(node, savedState, false, false, false, 0L); 1474 if (cancelled) { 1475 unlinkCancelledWaiters(node); 1476 if (interrupted) 1477 throw new InterruptedException(); 1478 } else if (interrupted) 1479 Thread.currentThread().interrupt(); 1480 return !cancelled; 1481 } 1482 1483 // support for instrumentation 1484 1485 /** 1486 * Returns true if this condition was created by the given 1487 * synchronization object. 1488 * 1489 * @return {@code true} if owned 1490 */ isOwnedBy(AbstractQueuedLongSynchronizer sync)1491 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) { 1492 return sync == AbstractQueuedLongSynchronizer.this; 1493 } 1494 1495 /** 1496 * Queries whether any threads are waiting on this condition. 1497 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters(ConditionObject)}. 1498 * 1499 * @return {@code true} if there are any waiting threads 1500 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1501 * returns {@code false} 1502 */ hasWaiters()1503 protected final boolean hasWaiters() { 1504 if (!isHeldExclusively()) 1505 throw new IllegalMonitorStateException(); 1506 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1507 if ((w.status & COND) != 0) 1508 return true; 1509 } 1510 return false; 1511 } 1512 1513 /** 1514 * Returns an estimate of the number of threads waiting on 1515 * this condition. 1516 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength(ConditionObject)}. 1517 * 1518 * @return the estimated number of waiting threads 1519 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1520 * returns {@code false} 1521 */ getWaitQueueLength()1522 protected final int getWaitQueueLength() { 1523 if (!isHeldExclusively()) 1524 throw new IllegalMonitorStateException(); 1525 int n = 0; 1526 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1527 if ((w.status & COND) != 0) 1528 ++n; 1529 } 1530 return n; 1531 } 1532 1533 /** 1534 * Returns a collection containing those threads that may be 1535 * waiting on this Condition. 1536 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads(ConditionObject)}. 1537 * 1538 * @return the collection of threads 1539 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1540 * returns {@code false} 1541 */ getWaitingThreads()1542 protected final Collection<Thread> getWaitingThreads() { 1543 if (!isHeldExclusively()) 1544 throw new IllegalMonitorStateException(); 1545 ArrayList<Thread> list = new ArrayList<>(); 1546 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1547 if ((w.status & COND) != 0) { 1548 Thread t = w.waiter; 1549 if (t != null) 1550 list.add(t); 1551 } 1552 } 1553 return list; 1554 } 1555 } 1556 1557 // Unsafe 1558 private static final Unsafe U = Unsafe.getUnsafe(); 1559 private static final long STATE 1560 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "state"); 1561 private static final long HEAD 1562 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "head"); 1563 private static final long TAIL 1564 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "tail"); 1565 1566 static { 1567 Class<?> ensureLoaded = LockSupport.class; 1568 } 1569 } 1570