1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent.locks; 37 38 import java.util.ArrayList; 39 import java.util.Collection; 40 import java.util.Date; 41 import java.util.concurrent.TimeUnit; 42 import java.util.concurrent.ForkJoinPool; 43 import java.util.concurrent.RejectedExecutionException; 44 import jdk.internal.misc.Unsafe; 45 46 /** 47 * A version of {@link AbstractQueuedSynchronizer} in 48 * which synchronization state is maintained as a {@code long}. 49 * This class has exactly the same structure, properties, and methods 50 * as {@code AbstractQueuedSynchronizer} with the exception 51 * that all state-related parameters and results are defined 52 * as {@code long} rather than {@code int}. This class 53 * may be useful when creating synchronizers such as 54 * multilevel locks and barriers that require 55 * 64 bits of state. 56 * 57 * <p>See {@link AbstractQueuedSynchronizer} for usage 58 * notes and examples. 59 * 60 * @since 1.6 61 * @author Doug Lea 62 */ 63 public abstract class AbstractQueuedLongSynchronizer 64 extends AbstractOwnableSynchronizer 65 implements java.io.Serializable { 66 67 private static final long serialVersionUID = 7373984972572414692L; 68 69 /** 70 * Constructor for subclasses to call. 71 */ 72 // Android-changed: Keep the constructor protected. 73 // public AbstractQueuedLongSynchronizer() {} AbstractQueuedLongSynchronizer()74 protected AbstractQueuedLongSynchronizer() {} 75 76 /* 77 * To keep sources in sync, the remainder of this source file is 78 * exactly cloned from AbstractQueuedSynchronizer, replacing class 79 * name and changing ints related with sync state to longs. Please 80 * keep it that way. 81 */ 82 83 // Node status bits, also used as argument and return values 84 static final int WAITING = 1; // must be 1 85 static final int CANCELLED = 0x80000000; // must be negative 86 static final int COND = 2; // in a condition wait 87 88 /** CLH Nodes */ 89 abstract static class Node { 90 volatile Node prev; // initially attached via casTail 91 volatile Node next; // visibly nonnull when signallable 92 Thread waiter; // visibly nonnull when enqueued 93 volatile int status; // written by owner, atomic bit ops by others 94 95 // methods for atomic operations casPrev(Node c, Node v)96 final boolean casPrev(Node c, Node v) { // for cleanQueue 97 return U.weakCompareAndSetReference(this, PREV, c, v); 98 } casNext(Node c, Node v)99 final boolean casNext(Node c, Node v) { // for cleanQueue 100 return U.weakCompareAndSetReference(this, NEXT, c, v); 101 } getAndUnsetStatus(int v)102 final int getAndUnsetStatus(int v) { // for signalling 103 return U.getAndBitwiseAndInt(this, STATUS, ~v); 104 } setPrevRelaxed(Node p)105 final void setPrevRelaxed(Node p) { // for off-queue assignment 106 U.putReference(this, PREV, p); 107 } setStatusRelaxed(int s)108 final void setStatusRelaxed(int s) { // for off-queue assignment 109 U.putInt(this, STATUS, s); 110 } clearStatus()111 final void clearStatus() { // for reducing unneeded signals 112 U.putIntOpaque(this, STATUS, 0); 113 } 114 115 private static final long STATUS 116 = U.objectFieldOffset(Node.class, "status"); 117 private static final long NEXT 118 = U.objectFieldOffset(Node.class, "next"); 119 private static final long PREV 120 = U.objectFieldOffset(Node.class, "prev"); 121 } 122 123 // Concrete classes tagged by type 124 static final class ExclusiveNode extends Node { } 125 static final class SharedNode extends Node { } 126 127 static final class ConditionNode extends Node 128 implements ForkJoinPool.ManagedBlocker { 129 ConditionNode nextWaiter; // link to next waiting node 130 131 /** 132 * Allows Conditions to be used in ForkJoinPools without 133 * risking fixed pool exhaustion. This is usable only for 134 * untimed Condition waits, not timed versions. 135 */ isReleasable()136 public final boolean isReleasable() { 137 return status <= 1 || Thread.currentThread().isInterrupted(); 138 } 139 block()140 public final boolean block() { 141 while (!isReleasable()) LockSupport.park(); 142 return true; 143 } 144 } 145 146 /** 147 * Head of the wait queue, lazily initialized. 148 */ 149 private transient volatile Node head; 150 151 /** 152 * Tail of the wait queue. After initialization, modified only via casTail. 153 */ 154 private transient volatile Node tail; 155 156 /** 157 * The synchronization state. 158 */ 159 private volatile long state; 160 161 /** 162 * Returns the current value of synchronization state. 163 * This operation has memory semantics of a {@code volatile} read. 164 * @return current state value 165 */ getState()166 protected final long getState() { 167 return state; 168 } 169 170 /** 171 * Sets the value of synchronization state. 172 * This operation has memory semantics of a {@code volatile} write. 173 * @param newState the new state value 174 */ setState(long newState)175 protected final void setState(long newState) { 176 state = newState; 177 } 178 179 /** 180 * Atomically sets synchronization state to the given updated 181 * value if the current state value equals the expected value. 182 * This operation has memory semantics of a {@code volatile} read 183 * and write. 184 * 185 * @param expect the expected value 186 * @param update the new value 187 * @return {@code true} if successful. False return indicates that the actual 188 * value was not equal to the expected value. 189 */ compareAndSetState(long expect, long update)190 protected final boolean compareAndSetState(long expect, long update) { 191 return U.compareAndSetLong(this, STATE, expect, update); 192 } 193 194 // Queuing utilities 195 casTail(Node c, Node v)196 private boolean casTail(Node c, Node v) { 197 return U.compareAndSetReference(this, TAIL, c, v); 198 } 199 200 /** tries once to CAS a new dummy node for head */ tryInitializeHead()201 private void tryInitializeHead() { 202 Node h = new ExclusiveNode(); 203 if (U.compareAndSetReference(this, HEAD, null, h)) 204 tail = h; 205 } 206 207 /** 208 * Enqueues the node unless null. (Currently used only for 209 * ConditionNodes; other cases are interleaved with acquires.) 210 */ enqueue(Node node)211 final void enqueue(Node node) { 212 if (node != null) { 213 for (;;) { 214 Node t = tail; 215 node.setPrevRelaxed(t); // avoid unnecessary fence 216 if (t == null) // initialize 217 tryInitializeHead(); 218 else if (casTail(t, node)) { 219 t.next = node; 220 if (t.status < 0) // wake up to clean link 221 LockSupport.unpark(node.waiter); 222 break; 223 } 224 } 225 } 226 } 227 228 /** Returns true if node is found in traversal from tail */ isEnqueued(Node node)229 final boolean isEnqueued(Node node) { 230 for (Node t = tail; t != null; t = t.prev) 231 if (t == node) 232 return true; 233 return false; 234 } 235 236 /** 237 * Wakes up the successor of given node, if one exists, and unsets its 238 * WAITING status to avoid park race. This may fail to wake up an 239 * eligible thread when one or more have been cancelled, but 240 * cancelAcquire ensures liveness. 241 */ signalNext(Node h)242 private static void signalNext(Node h) { 243 Node s; 244 if (h != null && (s = h.next) != null && s.status != 0) { 245 s.getAndUnsetStatus(WAITING); 246 LockSupport.unpark(s.waiter); 247 } 248 } 249 250 /** Wakes up the given node if in shared mode */ signalNextIfShared(Node h)251 private static void signalNextIfShared(Node h) { 252 Node s; 253 if (h != null && (s = h.next) != null && 254 (s instanceof SharedNode) && s.status != 0) { 255 s.getAndUnsetStatus(WAITING); 256 LockSupport.unpark(s.waiter); 257 } 258 } 259 260 /** 261 * Main acquire method, invoked by all exported acquire methods. 262 * 263 * @param node null unless a reacquiring Condition 264 * @param arg the acquire argument 265 * @param shared true if shared mode else exclusive 266 * @param interruptible if abort and return negative on interrupt 267 * @param timed if true use timed waits 268 * @param time if timed, the System.nanoTime value to timeout 269 * @return positive if acquired, 0 if timed out, negative if interrupted 270 */ acquire(Node node, long arg, boolean shared, boolean interruptible, boolean timed, long time)271 final int acquire(Node node, long arg, boolean shared, 272 boolean interruptible, boolean timed, long time) { 273 Thread current = Thread.currentThread(); 274 byte spins = 0, postSpins = 0; // retries upon unpark of first thread 275 boolean interrupted = false, first = false; 276 Node pred = null; // predecessor of node when enqueued 277 278 /* 279 * Repeatedly: 280 * Check if node now first 281 * if so, ensure head stable, else ensure valid predecessor 282 * if node is first or not yet enqueued, try acquiring 283 * else if node not yet created, create it 284 * else if not yet enqueued, try once to enqueue 285 * else if woken from park, retry (up to postSpins times) 286 * else if WAITING status not set, set and retry 287 * else park and clear WAITING status, and check cancellation 288 */ 289 290 for (;;) { 291 if (!first && (pred = (node == null) ? null : node.prev) != null && 292 !(first = (head == pred))) { 293 if (pred.status < 0) { 294 cleanQueue(); // predecessor cancelled 295 continue; 296 } else if (pred.prev == null) { 297 Thread.onSpinWait(); // ensure serialization 298 continue; 299 } 300 } 301 if (first || pred == null) { 302 boolean acquired; 303 try { 304 if (shared) 305 acquired = (tryAcquireShared(arg) >= 0); 306 else 307 acquired = tryAcquire(arg); 308 } catch (Throwable ex) { 309 cancelAcquire(node, interrupted, false); 310 throw ex; 311 } 312 if (acquired) { 313 if (first) { 314 node.prev = null; 315 head = node; 316 pred.next = null; 317 node.waiter = null; 318 if (shared) 319 signalNextIfShared(node); 320 if (interrupted) 321 current.interrupt(); 322 } 323 return 1; 324 } 325 } 326 if (node == null) { // allocate; retry before enqueue 327 if (shared) 328 node = new SharedNode(); 329 else 330 node = new ExclusiveNode(); 331 } else if (pred == null) { // try to enqueue 332 node.waiter = current; 333 Node t = tail; 334 node.setPrevRelaxed(t); // avoid unnecessary fence 335 if (t == null) 336 tryInitializeHead(); 337 else if (!casTail(t, node)) 338 node.setPrevRelaxed(null); // back out 339 else 340 t.next = node; 341 } else if (first && spins != 0) { 342 --spins; // reduce unfairness on rewaits 343 Thread.onSpinWait(); 344 } else if (node.status == 0) { 345 node.status = WAITING; // enable signal and recheck 346 } else { 347 long nanos; 348 spins = postSpins = (byte)((postSpins << 1) | 1); 349 if (!timed) 350 LockSupport.park(this); 351 else if ((nanos = time - System.nanoTime()) > 0L) 352 LockSupport.parkNanos(this, nanos); 353 else 354 break; 355 node.clearStatus(); 356 if ((interrupted |= Thread.interrupted()) && interruptible) 357 break; 358 } 359 } 360 return cancelAcquire(node, interrupted, interruptible); 361 } 362 363 /** 364 * Possibly repeatedly traverses from tail, unsplicing cancelled 365 * nodes until none are found. 366 */ cleanQueue()367 private void cleanQueue() { 368 for (;;) { // restart point 369 for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples 370 if (q == null || (p = q.prev) == null) 371 return; // end of list 372 if (s == null ? tail != q : (s.prev != q || s.status < 0)) 373 break; // inconsistent 374 if (q.status < 0) { // cancelled 375 if ((s == null ? casTail(q, p) : s.casPrev(q, p)) && 376 q.prev == p) { 377 p.casNext(q, s); // OK if fails 378 if (p.prev == null) 379 signalNext(p); 380 } 381 break; 382 } 383 if ((n = p.next) != q) { // help finish 384 if (n != null && q.prev == p) { 385 p.casNext(n, q); 386 if (p.prev == null) 387 signalNext(p); 388 } 389 break; 390 } 391 s = q; 392 q = q.prev; 393 } 394 } 395 } 396 397 /** 398 * Cancels an ongoing attempt to acquire. 399 * 400 * @param node the node (may be null if cancelled before enqueuing) 401 * @param interrupted true if thread interrupted 402 * @param interruptible if should report interruption vs reset 403 */ cancelAcquire(Node node, boolean interrupted, boolean interruptible)404 private int cancelAcquire(Node node, boolean interrupted, 405 boolean interruptible) { 406 if (node != null) { 407 node.waiter = null; 408 node.status = CANCELLED; 409 if (node.prev != null) 410 cleanQueue(); 411 } 412 if (interrupted) { 413 if (interruptible) 414 return CANCELLED; 415 else 416 Thread.currentThread().interrupt(); 417 } 418 return 0; 419 } 420 421 // Main exported methods 422 423 /** 424 * Attempts to acquire in exclusive mode. This method should query 425 * if the state of the object permits it to be acquired in the 426 * exclusive mode, and if so to acquire it. 427 * 428 * <p>This method is always invoked by the thread performing 429 * acquire. If this method reports failure, the acquire method 430 * may queue the thread, if it is not already queued, until it is 431 * signalled by a release from some other thread. This can be used 432 * to implement method {@link Lock#tryLock()}. 433 * 434 * <p>The default 435 * implementation throws {@link UnsupportedOperationException}. 436 * 437 * @param arg the acquire argument. This value is always the one 438 * passed to an acquire method, or is the value saved on entry 439 * to a condition wait. The value is otherwise uninterpreted 440 * and can represent anything you like. 441 * @return {@code true} if successful. Upon success, this object has 442 * been acquired. 443 * @throws IllegalMonitorStateException if acquiring would place this 444 * synchronizer in an illegal state. This exception must be 445 * thrown in a consistent fashion for synchronization to work 446 * correctly. 447 * @throws UnsupportedOperationException if exclusive mode is not supported 448 */ tryAcquire(long arg)449 protected boolean tryAcquire(long arg) { 450 throw new UnsupportedOperationException(); 451 } 452 453 /** 454 * Attempts to set the state to reflect a release in exclusive 455 * mode. 456 * 457 * <p>This method is always invoked by the thread performing release. 458 * 459 * <p>The default implementation throws 460 * {@link UnsupportedOperationException}. 461 * 462 * @param arg the release argument. This value is always the one 463 * passed to a release method, or the current state value upon 464 * entry to a condition wait. The value is otherwise 465 * uninterpreted and can represent anything you like. 466 * @return {@code true} if this object is now in a fully released 467 * state, so that any waiting threads may attempt to acquire; 468 * and {@code false} otherwise. 469 * @throws IllegalMonitorStateException if releasing would place this 470 * synchronizer in an illegal state. This exception must be 471 * thrown in a consistent fashion for synchronization to work 472 * correctly. 473 * @throws UnsupportedOperationException if exclusive mode is not supported 474 */ tryRelease(long arg)475 protected boolean tryRelease(long arg) { 476 throw new UnsupportedOperationException(); 477 } 478 479 /** 480 * Attempts to acquire in shared mode. This method should query if 481 * the state of the object permits it to be acquired in the shared 482 * mode, and if so to acquire it. 483 * 484 * <p>This method is always invoked by the thread performing 485 * acquire. If this method reports failure, the acquire method 486 * may queue the thread, if it is not already queued, until it is 487 * signalled by a release from some other thread. 488 * 489 * <p>The default implementation throws {@link 490 * UnsupportedOperationException}. 491 * 492 * @param arg the acquire argument. This value is always the one 493 * passed to an acquire method, or is the value saved on entry 494 * to a condition wait. The value is otherwise uninterpreted 495 * and can represent anything you like. 496 * @return a negative value on failure; zero if acquisition in shared 497 * mode succeeded but no subsequent shared-mode acquire can 498 * succeed; and a positive value if acquisition in shared 499 * mode succeeded and subsequent shared-mode acquires might 500 * also succeed, in which case a subsequent waiting thread 501 * must check availability. (Support for three different 502 * return values enables this method to be used in contexts 503 * where acquires only sometimes act exclusively.) Upon 504 * success, this object has been acquired. 505 * @throws IllegalMonitorStateException if acquiring would place this 506 * synchronizer in an illegal state. This exception must be 507 * thrown in a consistent fashion for synchronization to work 508 * correctly. 509 * @throws UnsupportedOperationException if shared mode is not supported 510 */ tryAcquireShared(long arg)511 protected long tryAcquireShared(long arg) { 512 throw new UnsupportedOperationException(); 513 } 514 515 /** 516 * Attempts to set the state to reflect a release in shared mode. 517 * 518 * <p>This method is always invoked by the thread performing release. 519 * 520 * <p>The default implementation throws 521 * {@link UnsupportedOperationException}. 522 * 523 * @param arg the release argument. This value is always the one 524 * passed to a release method, or the current state value upon 525 * entry to a condition wait. The value is otherwise 526 * uninterpreted and can represent anything you like. 527 * @return {@code true} if this release of shared mode may permit a 528 * waiting acquire (shared or exclusive) to succeed; and 529 * {@code false} otherwise 530 * @throws IllegalMonitorStateException if releasing would place this 531 * synchronizer in an illegal state. This exception must be 532 * thrown in a consistent fashion for synchronization to work 533 * correctly. 534 * @throws UnsupportedOperationException if shared mode is not supported 535 */ tryReleaseShared(long arg)536 protected boolean tryReleaseShared(long arg) { 537 throw new UnsupportedOperationException(); 538 } 539 540 /** 541 * Returns {@code true} if synchronization is held exclusively with 542 * respect to the current (calling) thread. This method is invoked 543 * upon each call to a {@link ConditionObject} method. 544 * 545 * <p>The default implementation throws {@link 546 * UnsupportedOperationException}. This method is invoked 547 * internally only within {@link ConditionObject} methods, so need 548 * not be defined if conditions are not used. 549 * 550 * @return {@code true} if synchronization is held exclusively; 551 * {@code false} otherwise 552 * @throws UnsupportedOperationException if conditions are not supported 553 */ isHeldExclusively()554 protected boolean isHeldExclusively() { 555 throw new UnsupportedOperationException(); 556 } 557 558 /** 559 * Acquires in exclusive mode, ignoring interrupts. Implemented 560 * by invoking at least once {@link #tryAcquire}, 561 * returning on success. Otherwise the thread is queued, possibly 562 * repeatedly blocking and unblocking, invoking {@link 563 * #tryAcquire} until success. This method can be used 564 * to implement method {@link Lock#lock}. 565 * 566 * @param arg the acquire argument. This value is conveyed to 567 * {@link #tryAcquire} but is otherwise uninterpreted and 568 * can represent anything you like. 569 */ acquire(long arg)570 public final void acquire(long arg) { 571 if (!tryAcquire(arg)) 572 acquire(null, arg, false, false, false, 0L); 573 } 574 575 /** 576 * Acquires in exclusive mode, aborting if interrupted. 577 * Implemented by first checking interrupt status, then invoking 578 * at least once {@link #tryAcquire}, returning on 579 * success. Otherwise the thread is queued, possibly repeatedly 580 * blocking and unblocking, invoking {@link #tryAcquire} 581 * until success or the thread is interrupted. This method can be 582 * used to implement method {@link Lock#lockInterruptibly}. 583 * 584 * @param arg the acquire argument. This value is conveyed to 585 * {@link #tryAcquire} but is otherwise uninterpreted and 586 * can represent anything you like. 587 * @throws InterruptedException if the current thread is interrupted 588 */ acquireInterruptibly(long arg)589 public final void acquireInterruptibly(long arg) 590 throws InterruptedException { 591 if (Thread.interrupted() || 592 (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0)) 593 throw new InterruptedException(); 594 } 595 596 /** 597 * Attempts to acquire in exclusive mode, aborting if interrupted, 598 * and failing if the given timeout elapses. Implemented by first 599 * checking interrupt status, then invoking at least once {@link 600 * #tryAcquire}, returning on success. Otherwise, the thread is 601 * queued, possibly repeatedly blocking and unblocking, invoking 602 * {@link #tryAcquire} until success or the thread is interrupted 603 * or the timeout elapses. This method can be used to implement 604 * method {@link Lock#tryLock(long, TimeUnit)}. 605 * 606 * @param arg the acquire argument. This value is conveyed to 607 * {@link #tryAcquire} but is otherwise uninterpreted and 608 * can represent anything you like. 609 * @param nanosTimeout the maximum number of nanoseconds to wait 610 * @return {@code true} if acquired; {@code false} if timed out 611 * @throws InterruptedException if the current thread is interrupted 612 */ tryAcquireNanos(long arg, long nanosTimeout)613 public final boolean tryAcquireNanos(long arg, long nanosTimeout) 614 throws InterruptedException { 615 if (!Thread.interrupted()) { 616 if (tryAcquire(arg)) 617 return true; 618 if (nanosTimeout <= 0L) 619 return false; 620 int stat = acquire(null, arg, false, true, true, 621 System.nanoTime() + nanosTimeout); 622 if (stat > 0) 623 return true; 624 if (stat == 0) 625 return false; 626 } 627 throw new InterruptedException(); 628 } 629 630 /** 631 * Releases in exclusive mode. Implemented by unblocking one or 632 * more threads if {@link #tryRelease} returns true. 633 * This method can be used to implement method {@link Lock#unlock}. 634 * 635 * @param arg the release argument. This value is conveyed to 636 * {@link #tryRelease} but is otherwise uninterpreted and 637 * can represent anything you like. 638 * @return the value returned from {@link #tryRelease} 639 */ release(long arg)640 public final boolean release(long arg) { 641 if (tryRelease(arg)) { 642 signalNext(head); 643 return true; 644 } 645 return false; 646 } 647 648 /** 649 * Acquires in shared mode, ignoring interrupts. Implemented by 650 * first invoking at least once {@link #tryAcquireShared}, 651 * returning on success. Otherwise the thread is queued, possibly 652 * repeatedly blocking and unblocking, invoking {@link 653 * #tryAcquireShared} until success. 654 * 655 * @param arg the acquire argument. This value is conveyed to 656 * {@link #tryAcquireShared} but is otherwise uninterpreted 657 * and can represent anything you like. 658 */ acquireShared(long arg)659 public final void acquireShared(long arg) { 660 if (tryAcquireShared(arg) < 0) 661 acquire(null, arg, true, false, false, 0L); 662 } 663 664 /** 665 * Acquires in shared mode, aborting if interrupted. Implemented 666 * by first checking interrupt status, then invoking at least once 667 * {@link #tryAcquireShared}, returning on success. Otherwise the 668 * thread is queued, possibly repeatedly blocking and unblocking, 669 * invoking {@link #tryAcquireShared} until success or the thread 670 * is interrupted. 671 * @param arg the acquire argument. 672 * This value is conveyed to {@link #tryAcquireShared} but is 673 * otherwise uninterpreted and can represent anything 674 * you like. 675 * @throws InterruptedException if the current thread is interrupted 676 */ acquireSharedInterruptibly(long arg)677 public final void acquireSharedInterruptibly(long arg) 678 throws InterruptedException { 679 if (Thread.interrupted() || 680 (tryAcquireShared(arg) < 0 && 681 acquire(null, arg, true, true, false, 0L) < 0)) 682 throw new InterruptedException(); 683 } 684 685 /** 686 * Attempts to acquire in shared mode, aborting if interrupted, and 687 * failing if the given timeout elapses. Implemented by first 688 * checking interrupt status, then invoking at least once {@link 689 * #tryAcquireShared}, returning on success. Otherwise, the 690 * thread is queued, possibly repeatedly blocking and unblocking, 691 * invoking {@link #tryAcquireShared} until success or the thread 692 * is interrupted or the timeout elapses. 693 * 694 * @param arg the acquire argument. This value is conveyed to 695 * {@link #tryAcquireShared} but is otherwise uninterpreted 696 * and can represent anything you like. 697 * @param nanosTimeout the maximum number of nanoseconds to wait 698 * @return {@code true} if acquired; {@code false} if timed out 699 * @throws InterruptedException if the current thread is interrupted 700 */ tryAcquireSharedNanos(long arg, long nanosTimeout)701 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) 702 throws InterruptedException { 703 if (!Thread.interrupted()) { 704 if (tryAcquireShared(arg) >= 0) 705 return true; 706 if (nanosTimeout <= 0L) 707 return false; 708 int stat = acquire(null, arg, true, true, true, 709 System.nanoTime() + nanosTimeout); 710 if (stat > 0) 711 return true; 712 if (stat == 0) 713 return false; 714 } 715 throw new InterruptedException(); 716 } 717 718 /** 719 * Releases in shared mode. Implemented by unblocking one or more 720 * threads if {@link #tryReleaseShared} returns true. 721 * 722 * @param arg the release argument. This value is conveyed to 723 * {@link #tryReleaseShared} but is otherwise uninterpreted 724 * and can represent anything you like. 725 * @return the value returned from {@link #tryReleaseShared} 726 */ releaseShared(long arg)727 public final boolean releaseShared(long arg) { 728 if (tryReleaseShared(arg)) { 729 signalNext(head); 730 return true; 731 } 732 return false; 733 } 734 735 // Queue inspection methods 736 737 /** 738 * Queries whether any threads are waiting to acquire. Note that 739 * because cancellations due to interrupts and timeouts may occur 740 * at any time, a {@code true} return does not guarantee that any 741 * other thread will ever acquire. 742 * 743 * @return {@code true} if there may be other threads waiting to acquire 744 */ hasQueuedThreads()745 public final boolean hasQueuedThreads() { 746 for (Node p = tail, h = head; p != h && p != null; p = p.prev) 747 if (p.status >= 0) 748 return true; 749 return false; 750 } 751 752 /** 753 * Queries whether any threads have ever contended to acquire this 754 * synchronizer; that is, if an acquire method has ever blocked. 755 * 756 * <p>In this implementation, this operation returns in 757 * constant time. 758 * 759 * @return {@code true} if there has ever been contention 760 */ hasContended()761 public final boolean hasContended() { 762 return head != null; 763 } 764 765 /** 766 * Returns the first (longest-waiting) thread in the queue, or 767 * {@code null} if no threads are currently queued. 768 * 769 * <p>In this implementation, this operation normally returns in 770 * constant time, but may iterate upon contention if other threads are 771 * concurrently modifying the queue. 772 * 773 * @return the first (longest-waiting) thread in the queue, or 774 * {@code null} if no threads are currently queued 775 */ getFirstQueuedThread()776 public final Thread getFirstQueuedThread() { 777 Thread first = null, w; Node h, s; 778 if ((h = head) != null && ((s = h.next) == null || 779 (first = s.waiter) == null || 780 s.prev == null)) { 781 // traverse from tail on stale reads 782 for (Node p = tail, q; p != null && (q = p.prev) != null; p = q) 783 if ((w = p.waiter) != null) 784 first = w; 785 } 786 return first; 787 } 788 789 /** 790 * Returns true if the given thread is currently queued. 791 * 792 * <p>This implementation traverses the queue to determine 793 * presence of the given thread. 794 * 795 * @param thread the thread 796 * @return {@code true} if the given thread is on the queue 797 * @throws NullPointerException if the thread is null 798 */ isQueued(Thread thread)799 public final boolean isQueued(Thread thread) { 800 if (thread == null) 801 throw new NullPointerException(); 802 for (Node p = tail; p != null; p = p.prev) 803 if (p.waiter == thread) 804 return true; 805 return false; 806 } 807 808 /** 809 * Returns {@code true} if the apparent first queued thread, if one 810 * exists, is waiting in exclusive mode. If this method returns 811 * {@code true}, and the current thread is attempting to acquire in 812 * shared mode (that is, this method is invoked from {@link 813 * #tryAcquireShared}) then it is guaranteed that the current thread 814 * is not the first queued thread. Used only as a heuristic in 815 * ReentrantReadWriteLock. 816 */ apparentlyFirstQueuedIsExclusive()817 final boolean apparentlyFirstQueuedIsExclusive() { 818 Node h, s; 819 return (h = head) != null && (s = h.next) != null && 820 !(s instanceof SharedNode) && s.waiter != null; 821 } 822 823 /** 824 * Queries whether any threads have been waiting to acquire longer 825 * than the current thread. 826 * 827 * <p>An invocation of this method is equivalent to (but may be 828 * more efficient than): 829 * <pre> {@code 830 * getFirstQueuedThread() != Thread.currentThread() 831 * && hasQueuedThreads()}</pre> 832 * 833 * <p>Note that because cancellations due to interrupts and 834 * timeouts may occur at any time, a {@code true} return does not 835 * guarantee that some other thread will acquire before the current 836 * thread. Likewise, it is possible for another thread to win a 837 * race to enqueue after this method has returned {@code false}, 838 * due to the queue being empty. 839 * 840 * <p>This method is designed to be used by a fair synchronizer to 841 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>. 842 * Such a synchronizer's {@link #tryAcquire} method should return 843 * {@code false}, and its {@link #tryAcquireShared} method should 844 * return a negative value, if this method returns {@code true} 845 * (unless this is a reentrant acquire). For example, the {@code 846 * tryAcquire} method for a fair, reentrant, exclusive mode 847 * synchronizer might look like this: 848 * 849 * <pre> {@code 850 * protected boolean tryAcquire(long arg) { 851 * if (isHeldExclusively()) { 852 * // A reentrant acquire; increment hold count 853 * return true; 854 * } else if (hasQueuedPredecessors()) { 855 * return false; 856 * } else { 857 * // try to acquire normally 858 * } 859 * }}</pre> 860 * 861 * @return {@code true} if there is a queued thread preceding the 862 * current thread, and {@code false} if the current thread 863 * is at the head of the queue or the queue is empty 864 * @since 1.7 865 */ hasQueuedPredecessors()866 public final boolean hasQueuedPredecessors() { 867 Thread first = null; Node h, s; 868 if ((h = head) != null && ((s = h.next) == null || 869 (first = s.waiter) == null || 870 s.prev == null)) 871 first = getFirstQueuedThread(); // retry via getFirstQueuedThread 872 return first != null && first != Thread.currentThread(); 873 } 874 875 // Instrumentation and monitoring methods 876 877 /** 878 * Returns an estimate of the number of threads waiting to 879 * acquire. The value is only an estimate because the number of 880 * threads may change dynamically while this method traverses 881 * internal data structures. This method is designed for use in 882 * monitoring system state, not for synchronization control. 883 * 884 * @return the estimated number of threads waiting to acquire 885 */ getQueueLength()886 public final int getQueueLength() { 887 int n = 0; 888 for (Node p = tail; p != null; p = p.prev) { 889 if (p.waiter != null) 890 ++n; 891 } 892 return n; 893 } 894 895 /** 896 * Returns a collection containing threads that may be waiting to 897 * acquire. Because the actual set of threads may change 898 * dynamically while constructing this result, the returned 899 * collection is only a best-effort estimate. The elements of the 900 * returned collection are in no particular order. This method is 901 * designed to facilitate construction of subclasses that provide 902 * more extensive monitoring facilities. 903 * 904 * @return the collection of threads 905 */ getQueuedThreads()906 public final Collection<Thread> getQueuedThreads() { 907 ArrayList<Thread> list = new ArrayList<>(); 908 for (Node p = tail; p != null; p = p.prev) { 909 Thread t = p.waiter; 910 if (t != null) 911 list.add(t); 912 } 913 return list; 914 } 915 916 /** 917 * Returns a collection containing threads that may be waiting to 918 * acquire in exclusive mode. This has the same properties 919 * as {@link #getQueuedThreads} except that it only returns 920 * those threads waiting due to an exclusive acquire. 921 * 922 * @return the collection of threads 923 */ getExclusiveQueuedThreads()924 public final Collection<Thread> getExclusiveQueuedThreads() { 925 ArrayList<Thread> list = new ArrayList<>(); 926 for (Node p = tail; p != null; p = p.prev) { 927 if (!(p instanceof SharedNode)) { 928 Thread t = p.waiter; 929 if (t != null) 930 list.add(t); 931 } 932 } 933 return list; 934 } 935 936 /** 937 * Returns a collection containing threads that may be waiting to 938 * acquire in shared mode. This has the same properties 939 * as {@link #getQueuedThreads} except that it only returns 940 * those threads waiting due to a shared acquire. 941 * 942 * @return the collection of threads 943 */ getSharedQueuedThreads()944 public final Collection<Thread> getSharedQueuedThreads() { 945 ArrayList<Thread> list = new ArrayList<>(); 946 for (Node p = tail; p != null; p = p.prev) { 947 if (p instanceof SharedNode) { 948 Thread t = p.waiter; 949 if (t != null) 950 list.add(t); 951 } 952 } 953 return list; 954 } 955 956 /** 957 * Returns a string identifying this synchronizer, as well as its state. 958 * The state, in brackets, includes the String {@code "State ="} 959 * followed by the current value of {@link #getState}, and either 960 * {@code "nonempty"} or {@code "empty"} depending on whether the 961 * queue is empty. 962 * 963 * @return a string identifying this synchronizer, as well as its state 964 */ toString()965 public String toString() { 966 return super.toString() 967 + "[State = " + getState() + ", " 968 + (hasQueuedThreads() ? "non" : "") + "empty queue]"; 969 } 970 971 // Instrumentation methods for conditions 972 973 /** 974 * Queries whether the given ConditionObject 975 * uses this synchronizer as its lock. 976 * 977 * @param condition the condition 978 * @return {@code true} if owned 979 * @throws NullPointerException if the condition is null 980 */ owns(ConditionObject condition)981 public final boolean owns(ConditionObject condition) { 982 return condition.isOwnedBy(this); 983 } 984 985 /** 986 * Queries whether any threads are waiting on the given condition 987 * associated with this synchronizer. Note that because timeouts 988 * and interrupts may occur at any time, a {@code true} return 989 * does not guarantee that a future {@code signal} will awaken 990 * any threads. This method is designed primarily for use in 991 * monitoring of the system state. 992 * 993 * @param condition the condition 994 * @return {@code true} if there are any waiting threads 995 * @throws IllegalMonitorStateException if exclusive synchronization 996 * is not held 997 * @throws IllegalArgumentException if the given condition is 998 * not associated with this synchronizer 999 * @throws NullPointerException if the condition is null 1000 */ hasWaiters(ConditionObject condition)1001 public final boolean hasWaiters(ConditionObject condition) { 1002 if (!owns(condition)) 1003 throw new IllegalArgumentException("Not owner"); 1004 return condition.hasWaiters(); 1005 } 1006 1007 /** 1008 * Returns an estimate of the number of threads waiting on the 1009 * given condition associated with this synchronizer. Note that 1010 * because timeouts and interrupts may occur at any time, the 1011 * estimate serves only as an upper bound on the actual number of 1012 * waiters. This method is designed for use in monitoring system 1013 * state, not for synchronization control. 1014 * 1015 * @param condition the condition 1016 * @return the estimated number of waiting threads 1017 * @throws IllegalMonitorStateException if exclusive synchronization 1018 * is not held 1019 * @throws IllegalArgumentException if the given condition is 1020 * not associated with this synchronizer 1021 * @throws NullPointerException if the condition is null 1022 */ getWaitQueueLength(ConditionObject condition)1023 public final int getWaitQueueLength(ConditionObject condition) { 1024 if (!owns(condition)) 1025 throw new IllegalArgumentException("Not owner"); 1026 return condition.getWaitQueueLength(); 1027 } 1028 1029 /** 1030 * Returns a collection containing those threads that may be 1031 * waiting on the given condition associated with this 1032 * synchronizer. Because the actual set of threads may change 1033 * dynamically while constructing this result, the returned 1034 * collection is only a best-effort estimate. The elements of the 1035 * returned collection are in no particular order. 1036 * 1037 * @param condition the condition 1038 * @return the collection of threads 1039 * @throws IllegalMonitorStateException if exclusive synchronization 1040 * is not held 1041 * @throws IllegalArgumentException if the given condition is 1042 * not associated with this synchronizer 1043 * @throws NullPointerException if the condition is null 1044 */ getWaitingThreads(ConditionObject condition)1045 public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1046 if (!owns(condition)) 1047 throw new IllegalArgumentException("Not owner"); 1048 return condition.getWaitingThreads(); 1049 } 1050 1051 /** 1052 * Condition implementation for a {@link AbstractQueuedLongSynchronizer} 1053 * serving as the basis of a {@link Lock} implementation. 1054 * 1055 * <p>Method documentation for this class describes mechanics, 1056 * not behavioral specifications from the point of view of Lock 1057 * and Condition users. Exported versions of this class will in 1058 * general need to be accompanied by documentation describing 1059 * condition semantics that rely on those of the associated 1060 * {@code AbstractQueuedLongSynchronizer}. 1061 * 1062 * <p>This class is Serializable, but all fields are transient, 1063 * so deserialized conditions have no waiters. 1064 */ 1065 public class ConditionObject implements Condition, java.io.Serializable { 1066 private static final long serialVersionUID = 1173984872572414699L; 1067 /** First node of condition queue. */ 1068 private transient ConditionNode firstWaiter; 1069 /** Last node of condition queue. */ 1070 private transient ConditionNode lastWaiter; 1071 1072 /** 1073 * Creates a new {@code ConditionObject} instance. 1074 */ ConditionObject()1075 public ConditionObject() { } 1076 1077 // Signalling methods 1078 1079 /** 1080 * Removes and transfers one or all waiters to sync queue. 1081 */ doSignal(ConditionNode first, boolean all)1082 private void doSignal(ConditionNode first, boolean all) { 1083 while (first != null) { 1084 ConditionNode next = first.nextWaiter; 1085 if ((firstWaiter = next) == null) 1086 lastWaiter = null; 1087 if ((first.getAndUnsetStatus(COND) & COND) != 0) { 1088 enqueue(first); 1089 if (!all) 1090 break; 1091 } 1092 first = next; 1093 } 1094 } 1095 1096 /** 1097 * Moves the longest-waiting thread, if one exists, from the 1098 * wait queue for this condition to the wait queue for the 1099 * owning lock. 1100 * 1101 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1102 * returns {@code false} 1103 */ signal()1104 public final void signal() { 1105 ConditionNode first = firstWaiter; 1106 if (!isHeldExclusively()) 1107 throw new IllegalMonitorStateException(); 1108 if (first != null) 1109 doSignal(first, false); 1110 } 1111 1112 /** 1113 * Moves all threads from the wait queue for this condition to 1114 * the wait queue for the owning lock. 1115 * 1116 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1117 * returns {@code false} 1118 */ signalAll()1119 public final void signalAll() { 1120 ConditionNode first = firstWaiter; 1121 if (!isHeldExclusively()) 1122 throw new IllegalMonitorStateException(); 1123 if (first != null) 1124 doSignal(first, true); 1125 } 1126 1127 // Waiting methods 1128 1129 /** 1130 * Adds node to condition list and releases lock. 1131 * 1132 * @param node the node 1133 * @return savedState to reacquire after wait 1134 */ enableWait(ConditionNode node)1135 private long enableWait(ConditionNode node) { 1136 if (isHeldExclusively()) { 1137 node.waiter = Thread.currentThread(); 1138 node.setStatusRelaxed(COND | WAITING); 1139 ConditionNode last = lastWaiter; 1140 if (last == null) 1141 firstWaiter = node; 1142 else 1143 last.nextWaiter = node; 1144 lastWaiter = node; 1145 long savedState = getState(); 1146 if (release(savedState)) 1147 return savedState; 1148 } 1149 node.status = CANCELLED; // lock not held or inconsistent 1150 throw new IllegalMonitorStateException(); 1151 } 1152 1153 /** 1154 * Returns true if a node that was initially placed on a condition 1155 * queue is now ready to reacquire on sync queue. 1156 * @param node the node 1157 * @return true if is reacquiring 1158 */ canReacquire(ConditionNode node)1159 private boolean canReacquire(ConditionNode node) { 1160 // check links, not status to avoid enqueue race 1161 return node != null && node.prev != null && isEnqueued(node); 1162 } 1163 1164 /** 1165 * Unlinks the given node and other non-waiting nodes from 1166 * condition queue unless already unlinked. 1167 */ unlinkCancelledWaiters(ConditionNode node)1168 private void unlinkCancelledWaiters(ConditionNode node) { 1169 if (node == null || node.nextWaiter != null || node == lastWaiter) { 1170 ConditionNode w = firstWaiter, trail = null; 1171 while (w != null) { 1172 ConditionNode next = w.nextWaiter; 1173 if ((w.status & COND) == 0) { 1174 w.nextWaiter = null; 1175 if (trail == null) 1176 firstWaiter = next; 1177 else 1178 trail.nextWaiter = next; 1179 if (next == null) 1180 lastWaiter = trail; 1181 } else 1182 trail = w; 1183 w = next; 1184 } 1185 } 1186 } 1187 1188 /** 1189 * Implements uninterruptible condition wait. 1190 * <ol> 1191 * <li>Save lock state returned by {@link #getState}. 1192 * <li>Invoke {@link #release} with saved state as argument, 1193 * throwing IllegalMonitorStateException if it fails. 1194 * <li>Block until signalled. 1195 * <li>Reacquire by invoking specialized version of 1196 * {@link #acquire} with saved state as argument. 1197 * </ol> 1198 */ awaitUninterruptibly()1199 public final void awaitUninterruptibly() { 1200 ConditionNode node = new ConditionNode(); 1201 long savedState = enableWait(node); 1202 LockSupport.setCurrentBlocker(this); // for back-compatibility 1203 boolean interrupted = false, rejected = false; 1204 while (!canReacquire(node)) { 1205 if (Thread.interrupted()) 1206 interrupted = true; 1207 else if ((node.status & COND) != 0) { 1208 try { 1209 if (rejected) 1210 node.block(); 1211 else 1212 ForkJoinPool.managedBlock(node); 1213 } catch (RejectedExecutionException ex) { 1214 rejected = true; 1215 } catch (InterruptedException ie) { 1216 interrupted = true; 1217 } 1218 } else 1219 Thread.onSpinWait(); // awoke while enqueuing 1220 } 1221 LockSupport.setCurrentBlocker(null); 1222 node.clearStatus(); 1223 acquire(node, savedState, false, false, false, 0L); 1224 if (interrupted) 1225 Thread.currentThread().interrupt(); 1226 } 1227 1228 /** 1229 * Implements interruptible condition wait. 1230 * <ol> 1231 * <li>If current thread is interrupted, throw InterruptedException. 1232 * <li>Save lock state returned by {@link #getState}. 1233 * <li>Invoke {@link #release} with saved state as argument, 1234 * throwing IllegalMonitorStateException if it fails. 1235 * <li>Block until signalled or interrupted. 1236 * <li>Reacquire by invoking specialized version of 1237 * {@link #acquire} with saved state as argument. 1238 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1239 * </ol> 1240 */ await()1241 public final void await() throws InterruptedException { 1242 if (Thread.interrupted()) 1243 throw new InterruptedException(); 1244 ConditionNode node = new ConditionNode(); 1245 long savedState = enableWait(node); 1246 LockSupport.setCurrentBlocker(this); // for back-compatibility 1247 boolean interrupted = false, cancelled = false, rejected = false; 1248 while (!canReacquire(node)) { 1249 if (interrupted |= Thread.interrupted()) { 1250 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1251 break; // else interrupted after signal 1252 } else if ((node.status & COND) != 0) { 1253 try { 1254 if (rejected) 1255 node.block(); 1256 else 1257 ForkJoinPool.managedBlock(node); 1258 } catch (RejectedExecutionException ex) { 1259 rejected = true; 1260 } catch (InterruptedException ie) { 1261 interrupted = true; 1262 } 1263 } else 1264 Thread.onSpinWait(); // awoke while enqueuing 1265 } 1266 LockSupport.setCurrentBlocker(null); 1267 node.clearStatus(); 1268 acquire(node, savedState, false, false, false, 0L); 1269 if (interrupted) { 1270 if (cancelled) { 1271 unlinkCancelledWaiters(node); 1272 throw new InterruptedException(); 1273 } 1274 Thread.currentThread().interrupt(); 1275 } 1276 } 1277 1278 /** 1279 * Implements timed condition wait. 1280 * <ol> 1281 * <li>If current thread is interrupted, throw InterruptedException. 1282 * <li>Save lock state returned by {@link #getState}. 1283 * <li>Invoke {@link #release} with saved state as argument, 1284 * throwing IllegalMonitorStateException if it fails. 1285 * <li>Block until signalled, interrupted, or timed out. 1286 * <li>Reacquire by invoking specialized version of 1287 * {@link #acquire} with saved state as argument. 1288 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1289 * </ol> 1290 */ awaitNanos(long nanosTimeout)1291 public final long awaitNanos(long nanosTimeout) 1292 throws InterruptedException { 1293 if (Thread.interrupted()) 1294 throw new InterruptedException(); 1295 ConditionNode node = new ConditionNode(); 1296 long savedState = enableWait(node); 1297 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; 1298 long deadline = System.nanoTime() + nanos; 1299 boolean cancelled = false, interrupted = false; 1300 while (!canReacquire(node)) { 1301 if ((interrupted |= Thread.interrupted()) || 1302 (nanos = deadline - System.nanoTime()) <= 0L) { 1303 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1304 break; 1305 } else 1306 LockSupport.parkNanos(this, nanos); 1307 } 1308 node.clearStatus(); 1309 acquire(node, savedState, false, false, false, 0L); 1310 if (cancelled) { 1311 unlinkCancelledWaiters(node); 1312 if (interrupted) 1313 throw new InterruptedException(); 1314 } else if (interrupted) 1315 Thread.currentThread().interrupt(); 1316 long remaining = deadline - System.nanoTime(); // avoid overflow 1317 return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE; 1318 } 1319 1320 /** 1321 * Implements absolute timed condition wait. 1322 * <ol> 1323 * <li>If current thread is interrupted, throw InterruptedException. 1324 * <li>Save lock state returned by {@link #getState}. 1325 * <li>Invoke {@link #release} with saved state as argument, 1326 * throwing IllegalMonitorStateException if it fails. 1327 * <li>Block until signalled, interrupted, or timed out. 1328 * <li>Reacquire by invoking specialized version of 1329 * {@link #acquire} with saved state as argument. 1330 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1331 * <li>If timed out while blocked in step 4, return false, else true. 1332 * </ol> 1333 */ awaitUntil(Date deadline)1334 public final boolean awaitUntil(Date deadline) 1335 throws InterruptedException { 1336 long abstime = deadline.getTime(); 1337 if (Thread.interrupted()) 1338 throw new InterruptedException(); 1339 ConditionNode node = new ConditionNode(); 1340 long savedState = enableWait(node); 1341 boolean cancelled = false, interrupted = false; 1342 while (!canReacquire(node)) { 1343 if ((interrupted |= Thread.interrupted()) || 1344 System.currentTimeMillis() >= abstime) { 1345 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1346 break; 1347 } else 1348 LockSupport.parkUntil(this, abstime); 1349 } 1350 node.clearStatus(); 1351 acquire(node, savedState, false, false, false, 0L); 1352 if (cancelled) { 1353 unlinkCancelledWaiters(node); 1354 if (interrupted) 1355 throw new InterruptedException(); 1356 } else if (interrupted) 1357 Thread.currentThread().interrupt(); 1358 return !cancelled; 1359 } 1360 1361 /** 1362 * Implements timed condition wait. 1363 * <ol> 1364 * <li>If current thread is interrupted, throw InterruptedException. 1365 * <li>Save lock state returned by {@link #getState}. 1366 * <li>Invoke {@link #release} with saved state as argument, 1367 * throwing IllegalMonitorStateException if it fails. 1368 * <li>Block until signalled, interrupted, or timed out. 1369 * <li>Reacquire by invoking specialized version of 1370 * {@link #acquire} with saved state as argument. 1371 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1372 * <li>If timed out while blocked in step 4, return false, else true. 1373 * </ol> 1374 */ await(long time, TimeUnit unit)1375 public final boolean await(long time, TimeUnit unit) 1376 throws InterruptedException { 1377 long nanosTimeout = unit.toNanos(time); 1378 if (Thread.interrupted()) 1379 throw new InterruptedException(); 1380 ConditionNode node = new ConditionNode(); 1381 long savedState = enableWait(node); 1382 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; 1383 long deadline = System.nanoTime() + nanos; 1384 boolean cancelled = false, interrupted = false; 1385 while (!canReacquire(node)) { 1386 if ((interrupted |= Thread.interrupted()) || 1387 (nanos = deadline - System.nanoTime()) <= 0L) { 1388 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1389 break; 1390 } else 1391 LockSupport.parkNanos(this, nanos); 1392 } 1393 node.clearStatus(); 1394 acquire(node, savedState, false, false, false, 0L); 1395 if (cancelled) { 1396 unlinkCancelledWaiters(node); 1397 if (interrupted) 1398 throw new InterruptedException(); 1399 } else if (interrupted) 1400 Thread.currentThread().interrupt(); 1401 return !cancelled; 1402 } 1403 1404 // support for instrumentation 1405 1406 /** 1407 * Returns true if this condition was created by the given 1408 * synchronization object. 1409 * 1410 * @return {@code true} if owned 1411 */ isOwnedBy(AbstractQueuedLongSynchronizer sync)1412 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) { 1413 return sync == AbstractQueuedLongSynchronizer.this; 1414 } 1415 1416 /** 1417 * Queries whether any threads are waiting on this condition. 1418 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters(ConditionObject)}. 1419 * 1420 * @return {@code true} if there are any waiting threads 1421 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1422 * returns {@code false} 1423 */ hasWaiters()1424 protected final boolean hasWaiters() { 1425 if (!isHeldExclusively()) 1426 throw new IllegalMonitorStateException(); 1427 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1428 if ((w.status & COND) != 0) 1429 return true; 1430 } 1431 return false; 1432 } 1433 1434 /** 1435 * Returns an estimate of the number of threads waiting on 1436 * this condition. 1437 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength(ConditionObject)}. 1438 * 1439 * @return the estimated number of waiting threads 1440 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1441 * returns {@code false} 1442 */ getWaitQueueLength()1443 protected final int getWaitQueueLength() { 1444 if (!isHeldExclusively()) 1445 throw new IllegalMonitorStateException(); 1446 int n = 0; 1447 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1448 if ((w.status & COND) != 0) 1449 ++n; 1450 } 1451 return n; 1452 } 1453 1454 /** 1455 * Returns a collection containing those threads that may be 1456 * waiting on this Condition. 1457 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads(ConditionObject)}. 1458 * 1459 * @return the collection of threads 1460 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1461 * returns {@code false} 1462 */ getWaitingThreads()1463 protected final Collection<Thread> getWaitingThreads() { 1464 if (!isHeldExclusively()) 1465 throw new IllegalMonitorStateException(); 1466 ArrayList<Thread> list = new ArrayList<>(); 1467 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1468 if ((w.status & COND) != 0) { 1469 Thread t = w.waiter; 1470 if (t != null) 1471 list.add(t); 1472 } 1473 } 1474 return list; 1475 } 1476 } 1477 1478 // Unsafe 1479 private static final Unsafe U = Unsafe.getUnsafe(); 1480 private static final long STATE 1481 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "state"); 1482 private static final long HEAD 1483 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "head"); 1484 private static final long TAIL 1485 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "tail"); 1486 1487 static { 1488 Class<?> ensureLoaded = LockSupport.class; 1489 } 1490 } 1491