1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 9 import java.util.concurrent.atomic.AtomicReference; 10 import java.util.concurrent.locks.LockSupport; 11 12 /** 13 * A reusable synchronization barrier, similar in functionality to 14 * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and 15 * {@link java.util.concurrent.CountDownLatch CountDownLatch} 16 * but supporting more flexible usage. 17 * 18 * <p><b>Registration.</b> Unlike the case for other barriers, the 19 * number of parties <em>registered</em> to synchronize on a phaser 20 * may vary over time. Tasks may be registered at any time (using 21 * methods {@link #register}, {@link #bulkRegister}, or forms of 22 * constructors establishing initial numbers of parties), and 23 * optionally deregistered upon any arrival (using {@link 24 * #arriveAndDeregister}). As is the case with most basic 25 * synchronization constructs, registration and deregistration affect 26 * only internal counts; they do not establish any further internal 27 * bookkeeping, so tasks cannot query whether they are registered. 28 * (However, you can introduce such bookkeeping by subclassing this 29 * class.) 30 * 31 * <p><b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code 32 * Phaser} may be repeatedly awaited. Method {@link 33 * #arriveAndAwaitAdvance} has effect analogous to {@link 34 * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each 35 * generation of a phaser has an associated phase number. The phase 36 * number starts at zero, and advances when all parties arrive at the 37 * phaser, wrapping around to zero after reaching {@code 38 * Integer.MAX_VALUE}. The use of phase numbers enables independent 39 * control of actions upon arrival at a phaser and upon awaiting 40 * others, via two kinds of methods that may be invoked by any 41 * registered party: 42 * 43 * <ul> 44 * 45 * <li> <b>Arrival.</b> Methods {@link #arrive} and 46 * {@link #arriveAndDeregister} record arrival. These methods 47 * do not block, but return an associated <em>arrival phase 48 * number</em>; that is, the phase number of the phaser to which 49 * the arrival applied. When the final party for a given phase 50 * arrives, an optional action is performed and the phase 51 * advances. These actions are performed by the party 52 * triggering a phase advance, and are arranged by overriding 53 * method {@link #onAdvance(int, int)}, which also controls 54 * termination. Overriding this method is similar to, but more 55 * flexible than, providing a barrier action to a {@code 56 * CyclicBarrier}. 57 * 58 * <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an 59 * argument indicating an arrival phase number, and returns when 60 * the phaser advances to (or is already at) a different phase. 61 * Unlike similar constructions using {@code CyclicBarrier}, 62 * method {@code awaitAdvance} continues to wait even if the 63 * waiting thread is interrupted. Interruptible and timeout 64 * versions are also available, but exceptions encountered while 65 * tasks wait interruptibly or with timeout do not change the 66 * state of the phaser. If necessary, you can perform any 67 * associated recovery within handlers of those exceptions, 68 * often after invoking {@code forceTermination}. Phasers may 69 * also be used by tasks executing in a {@link ForkJoinPool}, 70 * which will ensure sufficient parallelism to execute tasks 71 * when others are blocked waiting for a phase to advance. 72 * 73 * </ul> 74 * 75 * <p><b>Termination.</b> A phaser may enter a <em>termination</em> 76 * state, that may be checked using method {@link #isTerminated}. Upon 77 * termination, all synchronization methods immediately return without 78 * waiting for advance, as indicated by a negative return value. 79 * Similarly, attempts to register upon termination have no effect. 80 * Termination is triggered when an invocation of {@code onAdvance} 81 * returns {@code true}. The default implementation returns {@code 82 * true} if a deregistration has caused the number of registered 83 * parties to become zero. As illustrated below, when phasers control 84 * actions with a fixed number of iterations, it is often convenient 85 * to override this method to cause termination when the current phase 86 * number reaches a threshold. Method {@link #forceTermination} is 87 * also available to abruptly release waiting threads and allow them 88 * to terminate. 89 * 90 * <p><b>Tiering.</b> Phasers may be <em>tiered</em> (i.e., 91 * constructed in tree structures) to reduce contention. Phasers with 92 * large numbers of parties that would otherwise experience heavy 93 * synchronization contention costs may instead be set up so that 94 * groups of sub-phasers share a common parent. This may greatly 95 * increase throughput even though it incurs greater per-operation 96 * overhead. 97 * 98 * <p>In a tree of tiered phasers, registration and deregistration of 99 * child phasers with their parent are managed automatically. 100 * Whenever the number of registered parties of a child phaser becomes 101 * non-zero (as established in the {@link #Phaser(Phaser,int)} 102 * constructor, {@link #register}, or {@link #bulkRegister}), the 103 * child phaser is registered with its parent. Whenever the number of 104 * registered parties becomes zero as the result of an invocation of 105 * {@link #arriveAndDeregister}, the child phaser is deregistered 106 * from its parent. 107 * 108 * <p><b>Monitoring.</b> While synchronization methods may be invoked 109 * only by registered parties, the current state of a phaser may be 110 * monitored by any caller. At any given moment there are {@link 111 * #getRegisteredParties} parties in total, of which {@link 112 * #getArrivedParties} have arrived at the current phase ({@link 113 * #getPhase}). When the remaining ({@link #getUnarrivedParties}) 114 * parties arrive, the phase advances. The values returned by these 115 * methods may reflect transient states and so are not in general 116 * useful for synchronization control. Method {@link #toString} 117 * returns snapshots of these state queries in a form convenient for 118 * informal monitoring. 119 * 120 * <p><b>Sample usages:</b> 121 * 122 * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch} 123 * to control a one-shot action serving a variable number of parties. 124 * The typical idiom is for the method setting this up to first 125 * register, then start the actions, then deregister, as in: 126 * 127 * <pre> {@code 128 * void runTasks(List<Runnable> tasks) { 129 * final Phaser phaser = new Phaser(1); // "1" to register self 130 * // create and start threads 131 * for (final Runnable task : tasks) { 132 * phaser.register(); 133 * new Thread() { 134 * public void run() { 135 * phaser.arriveAndAwaitAdvance(); // await all creation 136 * task.run(); 137 * } 138 * }.start(); 139 * } 140 * 141 * // allow threads to start and deregister self 142 * phaser.arriveAndDeregister(); 143 * }}</pre> 144 * 145 * <p>One way to cause a set of threads to repeatedly perform actions 146 * for a given number of iterations is to override {@code onAdvance}: 147 * 148 * <pre> {@code 149 * void startTasks(List<Runnable> tasks, final int iterations) { 150 * final Phaser phaser = new Phaser() { 151 * protected boolean onAdvance(int phase, int registeredParties) { 152 * return phase >= iterations || registeredParties == 0; 153 * } 154 * }; 155 * phaser.register(); 156 * for (final Runnable task : tasks) { 157 * phaser.register(); 158 * new Thread() { 159 * public void run() { 160 * do { 161 * task.run(); 162 * phaser.arriveAndAwaitAdvance(); 163 * } while (!phaser.isTerminated()); 164 * } 165 * }.start(); 166 * } 167 * phaser.arriveAndDeregister(); // deregister self, don't wait 168 * }}</pre> 169 * 170 * If the main task must later await termination, it 171 * may re-register and then execute a similar loop: 172 * <pre> {@code 173 * // ... 174 * phaser.register(); 175 * while (!phaser.isTerminated()) 176 * phaser.arriveAndAwaitAdvance();}</pre> 177 * 178 * <p>Related constructions may be used to await particular phase numbers 179 * in contexts where you are sure that the phase will never wrap around 180 * {@code Integer.MAX_VALUE}. For example: 181 * 182 * <pre> {@code 183 * void awaitPhase(Phaser phaser, int phase) { 184 * int p = phaser.register(); // assumes caller not already registered 185 * while (p < phase) { 186 * if (phaser.isTerminated()) 187 * // ... deal with unexpected termination 188 * else 189 * p = phaser.arriveAndAwaitAdvance(); 190 * } 191 * phaser.arriveAndDeregister(); 192 * }}</pre> 193 * 194 * 195 * <p>To create a set of {@code n} tasks using a tree of phasers, you 196 * could use code of the following form, assuming a Task class with a 197 * constructor accepting a {@code Phaser} that it registers with upon 198 * construction. After invocation of {@code build(new Task[n], 0, n, 199 * new Phaser())}, these tasks could then be started, for example by 200 * submitting to a pool: 201 * 202 * <pre> {@code 203 * void build(Task[] tasks, int lo, int hi, Phaser ph) { 204 * if (hi - lo > TASKS_PER_PHASER) { 205 * for (int i = lo; i < hi; i += TASKS_PER_PHASER) { 206 * int j = Math.min(i + TASKS_PER_PHASER, hi); 207 * build(tasks, i, j, new Phaser(ph)); 208 * } 209 * } else { 210 * for (int i = lo; i < hi; ++i) 211 * tasks[i] = new Task(ph); 212 * // assumes new Task(ph) performs ph.register() 213 * } 214 * }}</pre> 215 * 216 * The best value of {@code TASKS_PER_PHASER} depends mainly on 217 * expected synchronization rates. A value as low as four may 218 * be appropriate for extremely small per-phase task bodies (thus 219 * high rates), or up to hundreds for extremely large ones. 220 * 221 * <p><b>Implementation notes</b>: This implementation restricts the 222 * maximum number of parties to 65535. Attempts to register additional 223 * parties result in {@code IllegalStateException}. However, you can and 224 * should create tiered phasers to accommodate arbitrarily large sets 225 * of participants. 226 * 227 * @since 1.7 228 * @author Doug Lea 229 */ 230 public class Phaser { 231 /* 232 * This class implements an extension of X10 "clocks". Thanks to 233 * Vijay Saraswat for the idea, and to Vivek Sarkar for 234 * enhancements to extend functionality. 235 */ 236 237 /** 238 * Primary state representation, holding four bit-fields: 239 * 240 * unarrived -- the number of parties yet to hit barrier (bits 0-15) 241 * parties -- the number of parties to wait (bits 16-31) 242 * phase -- the generation of the barrier (bits 32-62) 243 * terminated -- set if barrier is terminated (bit 63 / sign) 244 * 245 * Except that a phaser with no registered parties is 246 * distinguished by the otherwise illegal state of having zero 247 * parties and one unarrived parties (encoded as EMPTY below). 248 * 249 * To efficiently maintain atomicity, these values are packed into 250 * a single (atomic) long. Good performance relies on keeping 251 * state decoding and encoding simple, and keeping race windows 252 * short. 253 * 254 * All state updates are performed via CAS except initial 255 * registration of a sub-phaser (i.e., one with a non-null 256 * parent). In this (relatively rare) case, we use built-in 257 * synchronization to lock while first registering with its 258 * parent. 259 * 260 * The phase of a subphaser is allowed to lag that of its 261 * ancestors until it is actually accessed -- see method 262 * reconcileState. 263 */ 264 private volatile long state; 265 266 private static final int MAX_PARTIES = 0xffff; 267 private static final int MAX_PHASE = Integer.MAX_VALUE; 268 private static final int PARTIES_SHIFT = 16; 269 private static final int PHASE_SHIFT = 32; 270 private static final int UNARRIVED_MASK = 0xffff; // to mask ints 271 private static final long PARTIES_MASK = 0xffff0000L; // to mask longs 272 private static final long COUNTS_MASK = 0xffffffffL; 273 private static final long TERMINATION_BIT = 1L << 63; 274 275 // some special values 276 private static final int ONE_ARRIVAL = 1; 277 private static final int ONE_PARTY = 1 << PARTIES_SHIFT; 278 private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY; 279 private static final int EMPTY = 1; 280 281 // The following unpacking methods are usually manually inlined 282 unarrivedOf(long s)283 private static int unarrivedOf(long s) { 284 int counts = (int)s; 285 return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); 286 } 287 partiesOf(long s)288 private static int partiesOf(long s) { 289 return (int)s >>> PARTIES_SHIFT; 290 } 291 phaseOf(long s)292 private static int phaseOf(long s) { 293 return (int)(s >>> PHASE_SHIFT); 294 } 295 arrivedOf(long s)296 private static int arrivedOf(long s) { 297 int counts = (int)s; 298 return (counts == EMPTY) ? 0 : 299 (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK); 300 } 301 302 /** 303 * The parent of this phaser, or null if none 304 */ 305 private final Phaser parent; 306 307 /** 308 * The root of phaser tree. Equals this if not in a tree. 309 */ 310 private final Phaser root; 311 312 /** 313 * Heads of Treiber stacks for waiting threads. To eliminate 314 * contention when releasing some threads while adding others, we 315 * use two of them, alternating across even and odd phases. 316 * Subphasers share queues with root to speed up releases. 317 */ 318 private final AtomicReference<QNode> evenQ; 319 private final AtomicReference<QNode> oddQ; 320 queueFor(int phase)321 private AtomicReference<QNode> queueFor(int phase) { 322 return ((phase & 1) == 0) ? evenQ : oddQ; 323 } 324 325 /** 326 * Returns message string for bounds exceptions on arrival. 327 */ badArrive(long s)328 private String badArrive(long s) { 329 return "Attempted arrival of unregistered party for " + 330 stateToString(s); 331 } 332 333 /** 334 * Returns message string for bounds exceptions on registration. 335 */ badRegister(long s)336 private String badRegister(long s) { 337 return "Attempt to register more than " + 338 MAX_PARTIES + " parties for " + stateToString(s); 339 } 340 341 /** 342 * Main implementation for methods arrive and arriveAndDeregister. 343 * Manually tuned to speed up and minimize race windows for the 344 * common case of just decrementing unarrived field. 345 * 346 * @param adjust value to subtract from state; 347 * ONE_ARRIVAL for arrive, 348 * ONE_DEREGISTER for arriveAndDeregister 349 */ doArrive(int adjust)350 private int doArrive(int adjust) { 351 final Phaser root = this.root; 352 for (;;) { 353 long s = (root == this) ? state : reconcileState(); 354 int phase = (int)(s >>> PHASE_SHIFT); 355 if (phase < 0) 356 return phase; 357 int counts = (int)s; 358 int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); 359 if (unarrived <= 0) 360 throw new IllegalStateException(badArrive(s)); 361 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) { 362 if (unarrived == 1) { 363 long n = s & PARTIES_MASK; // base of next state 364 int nextUnarrived = (int)n >>> PARTIES_SHIFT; 365 if (root == this) { 366 if (onAdvance(phase, nextUnarrived)) 367 n |= TERMINATION_BIT; 368 else if (nextUnarrived == 0) 369 n |= EMPTY; 370 else 371 n |= nextUnarrived; 372 int nextPhase = (phase + 1) & MAX_PHASE; 373 n |= (long)nextPhase << PHASE_SHIFT; 374 UNSAFE.compareAndSwapLong(this, stateOffset, s, n); 375 releaseWaiters(phase); 376 } 377 else if (nextUnarrived == 0) { // propagate deregistration 378 phase = parent.doArrive(ONE_DEREGISTER); 379 UNSAFE.compareAndSwapLong(this, stateOffset, 380 s, s | EMPTY); 381 } 382 else 383 phase = parent.doArrive(ONE_ARRIVAL); 384 } 385 return phase; 386 } 387 } 388 } 389 390 /** 391 * Implementation of register, bulkRegister 392 * 393 * @param registrations number to add to both parties and 394 * unarrived fields. Must be greater than zero. 395 */ doRegister(int registrations)396 private int doRegister(int registrations) { 397 // adjustment to state 398 long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; 399 final Phaser parent = this.parent; 400 int phase; 401 for (;;) { 402 long s = (parent == null) ? state : reconcileState(); 403 int counts = (int)s; 404 int parties = counts >>> PARTIES_SHIFT; 405 int unarrived = counts & UNARRIVED_MASK; 406 if (registrations > MAX_PARTIES - parties) 407 throw new IllegalStateException(badRegister(s)); 408 phase = (int)(s >>> PHASE_SHIFT); 409 if (phase < 0) 410 break; 411 if (counts != EMPTY) { // not 1st registration 412 if (parent == null || reconcileState() == s) { 413 if (unarrived == 0) // wait out advance 414 root.internalAwaitAdvance(phase, null); 415 else if (UNSAFE.compareAndSwapLong(this, stateOffset, 416 s, s + adjust)) 417 break; 418 } 419 } 420 else if (parent == null) { // 1st root registration 421 long next = ((long)phase << PHASE_SHIFT) | adjust; 422 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) 423 break; 424 } 425 else { 426 synchronized (this) { // 1st sub registration 427 if (state == s) { // recheck under lock 428 phase = parent.doRegister(1); 429 if (phase < 0) 430 break; 431 // finish registration whenever parent registration 432 // succeeded, even when racing with termination, 433 // since these are part of the same "transaction". 434 while (!UNSAFE.compareAndSwapLong 435 (this, stateOffset, s, 436 ((long)phase << PHASE_SHIFT) | adjust)) { 437 s = state; 438 phase = (int)(root.state >>> PHASE_SHIFT); 439 // assert (int)s == EMPTY; 440 } 441 break; 442 } 443 } 444 } 445 } 446 return phase; 447 } 448 449 /** 450 * Resolves lagged phase propagation from root if necessary. 451 * Reconciliation normally occurs when root has advanced but 452 * subphasers have not yet done so, in which case they must finish 453 * their own advance by setting unarrived to parties (or if 454 * parties is zero, resetting to unregistered EMPTY state). 455 * 456 * @return reconciled state 457 */ reconcileState()458 private long reconcileState() { 459 final Phaser root = this.root; 460 long s = state; 461 if (root != this) { 462 int phase, p; 463 // CAS to root phase with current parties, tripping unarrived 464 while ((phase = (int)(root.state >>> PHASE_SHIFT)) != 465 (int)(s >>> PHASE_SHIFT) && 466 !UNSAFE.compareAndSwapLong 467 (this, stateOffset, s, 468 s = (((long)phase << PHASE_SHIFT) | 469 ((phase < 0) ? (s & COUNTS_MASK) : 470 (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY : 471 ((s & PARTIES_MASK) | p)))))) 472 s = state; 473 } 474 return s; 475 } 476 477 /** 478 * Creates a new phaser with no initially registered parties, no 479 * parent, and initial phase number 0. Any thread using this 480 * phaser will need to first register for it. 481 */ Phaser()482 public Phaser() { 483 this(null, 0); 484 } 485 486 /** 487 * Creates a new phaser with the given number of registered 488 * unarrived parties, no parent, and initial phase number 0. 489 * 490 * @param parties the number of parties required to advance to the 491 * next phase 492 * @throws IllegalArgumentException if parties less than zero 493 * or greater than the maximum number of parties supported 494 */ Phaser(int parties)495 public Phaser(int parties) { 496 this(null, parties); 497 } 498 499 /** 500 * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}. 501 * 502 * @param parent the parent phaser 503 */ Phaser(Phaser parent)504 public Phaser(Phaser parent) { 505 this(parent, 0); 506 } 507 508 /** 509 * Creates a new phaser with the given parent and number of 510 * registered unarrived parties. When the given parent is non-null 511 * and the given number of parties is greater than zero, this 512 * child phaser is registered with its parent. 513 * 514 * @param parent the parent phaser 515 * @param parties the number of parties required to advance to the 516 * next phase 517 * @throws IllegalArgumentException if parties less than zero 518 * or greater than the maximum number of parties supported 519 */ Phaser(Phaser parent, int parties)520 public Phaser(Phaser parent, int parties) { 521 if (parties >>> PARTIES_SHIFT != 0) 522 throw new IllegalArgumentException("Illegal number of parties"); 523 int phase = 0; 524 this.parent = parent; 525 if (parent != null) { 526 final Phaser root = parent.root; 527 this.root = root; 528 this.evenQ = root.evenQ; 529 this.oddQ = root.oddQ; 530 if (parties != 0) 531 phase = parent.doRegister(1); 532 } 533 else { 534 this.root = this; 535 this.evenQ = new AtomicReference<QNode>(); 536 this.oddQ = new AtomicReference<QNode>(); 537 } 538 this.state = (parties == 0) ? (long)EMPTY : 539 ((long)phase << PHASE_SHIFT) | 540 ((long)parties << PARTIES_SHIFT) | 541 ((long)parties); 542 } 543 544 /** 545 * Adds a new unarrived party to this phaser. If an ongoing 546 * invocation of {@link #onAdvance} is in progress, this method 547 * may await its completion before returning. If this phaser has 548 * a parent, and this phaser previously had no registered parties, 549 * this child phaser is also registered with its parent. If 550 * this phaser is terminated, the attempt to register has 551 * no effect, and a negative value is returned. 552 * 553 * @return the arrival phase number to which this registration 554 * applied. If this value is negative, then this phaser has 555 * terminated, in which case registration has no effect. 556 * @throws IllegalStateException if attempting to register more 557 * than the maximum supported number of parties 558 */ register()559 public int register() { 560 return doRegister(1); 561 } 562 563 /** 564 * Adds the given number of new unarrived parties to this phaser. 565 * If an ongoing invocation of {@link #onAdvance} is in progress, 566 * this method may await its completion before returning. If this 567 * phaser has a parent, and the given number of parties is greater 568 * than zero, and this phaser previously had no registered 569 * parties, this child phaser is also registered with its parent. 570 * If this phaser is terminated, the attempt to register has no 571 * effect, and a negative value is returned. 572 * 573 * @param parties the number of additional parties required to 574 * advance to the next phase 575 * @return the arrival phase number to which this registration 576 * applied. If this value is negative, then this phaser has 577 * terminated, in which case registration has no effect. 578 * @throws IllegalStateException if attempting to register more 579 * than the maximum supported number of parties 580 * @throws IllegalArgumentException if {@code parties < 0} 581 */ bulkRegister(int parties)582 public int bulkRegister(int parties) { 583 if (parties < 0) 584 throw new IllegalArgumentException(); 585 if (parties == 0) 586 return getPhase(); 587 return doRegister(parties); 588 } 589 590 /** 591 * Arrives at this phaser, without waiting for others to arrive. 592 * 593 * <p>It is a usage error for an unregistered party to invoke this 594 * method. However, this error may result in an {@code 595 * IllegalStateException} only upon some subsequent operation on 596 * this phaser, if ever. 597 * 598 * @return the arrival phase number, or a negative value if terminated 599 * @throws IllegalStateException if not terminated and the number 600 * of unarrived parties would become negative 601 */ arrive()602 public int arrive() { 603 return doArrive(ONE_ARRIVAL); 604 } 605 606 /** 607 * Arrives at this phaser and deregisters from it without waiting 608 * for others to arrive. Deregistration reduces the number of 609 * parties required to advance in future phases. If this phaser 610 * has a parent, and deregistration causes this phaser to have 611 * zero parties, this phaser is also deregistered from its parent. 612 * 613 * <p>It is a usage error for an unregistered party to invoke this 614 * method. However, this error may result in an {@code 615 * IllegalStateException} only upon some subsequent operation on 616 * this phaser, if ever. 617 * 618 * @return the arrival phase number, or a negative value if terminated 619 * @throws IllegalStateException if not terminated and the number 620 * of registered or unarrived parties would become negative 621 */ arriveAndDeregister()622 public int arriveAndDeregister() { 623 return doArrive(ONE_DEREGISTER); 624 } 625 626 /** 627 * Arrives at this phaser and awaits others. Equivalent in effect 628 * to {@code awaitAdvance(arrive())}. If you need to await with 629 * interruption or timeout, you can arrange this with an analogous 630 * construction using one of the other forms of the {@code 631 * awaitAdvance} method. If instead you need to deregister upon 632 * arrival, use {@code awaitAdvance(arriveAndDeregister())}. 633 * 634 * <p>It is a usage error for an unregistered party to invoke this 635 * method. However, this error may result in an {@code 636 * IllegalStateException} only upon some subsequent operation on 637 * this phaser, if ever. 638 * 639 * @return the arrival phase number, or the (negative) 640 * {@linkplain #getPhase() current phase} if terminated 641 * @throws IllegalStateException if not terminated and the number 642 * of unarrived parties would become negative 643 */ arriveAndAwaitAdvance()644 public int arriveAndAwaitAdvance() { 645 // Specialization of doArrive+awaitAdvance eliminating some reads/paths 646 final Phaser root = this.root; 647 for (;;) { 648 long s = (root == this) ? state : reconcileState(); 649 int phase = (int)(s >>> PHASE_SHIFT); 650 if (phase < 0) 651 return phase; 652 int counts = (int)s; 653 int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); 654 if (unarrived <= 0) 655 throw new IllegalStateException(badArrive(s)); 656 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, 657 s -= ONE_ARRIVAL)) { 658 if (unarrived > 1) 659 return root.internalAwaitAdvance(phase, null); 660 if (root != this) 661 return parent.arriveAndAwaitAdvance(); 662 long n = s & PARTIES_MASK; // base of next state 663 int nextUnarrived = (int)n >>> PARTIES_SHIFT; 664 if (onAdvance(phase, nextUnarrived)) 665 n |= TERMINATION_BIT; 666 else if (nextUnarrived == 0) 667 n |= EMPTY; 668 else 669 n |= nextUnarrived; 670 int nextPhase = (phase + 1) & MAX_PHASE; 671 n |= (long)nextPhase << PHASE_SHIFT; 672 if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n)) 673 return (int)(state >>> PHASE_SHIFT); // terminated 674 releaseWaiters(phase); 675 return nextPhase; 676 } 677 } 678 } 679 680 /** 681 * Awaits the phase of this phaser to advance from the given phase 682 * value, returning immediately if the current phase is not equal 683 * to the given phase value or this phaser is terminated. 684 * 685 * @param phase an arrival phase number, or negative value if 686 * terminated; this argument is normally the value returned by a 687 * previous call to {@code arrive} or {@code arriveAndDeregister}. 688 * @return the next arrival phase number, or the argument if it is 689 * negative, or the (negative) {@linkplain #getPhase() current phase} 690 * if terminated 691 */ awaitAdvance(int phase)692 public int awaitAdvance(int phase) { 693 final Phaser root = this.root; 694 long s = (root == this) ? state : reconcileState(); 695 int p = (int)(s >>> PHASE_SHIFT); 696 if (phase < 0) 697 return phase; 698 if (p == phase) 699 return root.internalAwaitAdvance(phase, null); 700 return p; 701 } 702 703 /** 704 * Awaits the phase of this phaser to advance from the given phase 705 * value, throwing {@code InterruptedException} if interrupted 706 * while waiting, or returning immediately if the current phase is 707 * not equal to the given phase value or this phaser is 708 * terminated. 709 * 710 * @param phase an arrival phase number, or negative value if 711 * terminated; this argument is normally the value returned by a 712 * previous call to {@code arrive} or {@code arriveAndDeregister}. 713 * @return the next arrival phase number, or the argument if it is 714 * negative, or the (negative) {@linkplain #getPhase() current phase} 715 * if terminated 716 * @throws InterruptedException if thread interrupted while waiting 717 */ awaitAdvanceInterruptibly(int phase)718 public int awaitAdvanceInterruptibly(int phase) 719 throws InterruptedException { 720 final Phaser root = this.root; 721 long s = (root == this) ? state : reconcileState(); 722 int p = (int)(s >>> PHASE_SHIFT); 723 if (phase < 0) 724 return phase; 725 if (p == phase) { 726 QNode node = new QNode(this, phase, true, false, 0L); 727 p = root.internalAwaitAdvance(phase, node); 728 if (node.wasInterrupted) 729 throw new InterruptedException(); 730 } 731 return p; 732 } 733 734 /** 735 * Awaits the phase of this phaser to advance from the given phase 736 * value or the given timeout to elapse, throwing {@code 737 * InterruptedException} if interrupted while waiting, or 738 * returning immediately if the current phase is not equal to the 739 * given phase value or this phaser is terminated. 740 * 741 * @param phase an arrival phase number, or negative value if 742 * terminated; this argument is normally the value returned by a 743 * previous call to {@code arrive} or {@code arriveAndDeregister}. 744 * @param timeout how long to wait before giving up, in units of 745 * {@code unit} 746 * @param unit a {@code TimeUnit} determining how to interpret the 747 * {@code timeout} parameter 748 * @return the next arrival phase number, or the argument if it is 749 * negative, or the (negative) {@linkplain #getPhase() current phase} 750 * if terminated 751 * @throws InterruptedException if thread interrupted while waiting 752 * @throws TimeoutException if timed out while waiting 753 */ awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)754 public int awaitAdvanceInterruptibly(int phase, 755 long timeout, TimeUnit unit) 756 throws InterruptedException, TimeoutException { 757 long nanos = unit.toNanos(timeout); 758 final Phaser root = this.root; 759 long s = (root == this) ? state : reconcileState(); 760 int p = (int)(s >>> PHASE_SHIFT); 761 if (phase < 0) 762 return phase; 763 if (p == phase) { 764 QNode node = new QNode(this, phase, true, true, nanos); 765 p = root.internalAwaitAdvance(phase, node); 766 if (node.wasInterrupted) 767 throw new InterruptedException(); 768 else if (p == phase) 769 throw new TimeoutException(); 770 } 771 return p; 772 } 773 774 /** 775 * Forces this phaser to enter termination state. Counts of 776 * registered parties are unaffected. If this phaser is a member 777 * of a tiered set of phasers, then all of the phasers in the set 778 * are terminated. If this phaser is already terminated, this 779 * method has no effect. This method may be useful for 780 * coordinating recovery after one or more tasks encounter 781 * unexpected exceptions. 782 */ forceTermination()783 public void forceTermination() { 784 // Only need to change root state 785 final Phaser root = this.root; 786 long s; 787 while ((s = root.state) >= 0) { 788 if (UNSAFE.compareAndSwapLong(root, stateOffset, 789 s, s | TERMINATION_BIT)) { 790 // signal all threads 791 releaseWaiters(0); // Waiters on evenQ 792 releaseWaiters(1); // Waiters on oddQ 793 return; 794 } 795 } 796 } 797 798 /** 799 * Returns the current phase number. The maximum phase number is 800 * {@code Integer.MAX_VALUE}, after which it restarts at 801 * zero. Upon termination, the phase number is negative, 802 * in which case the prevailing phase prior to termination 803 * may be obtained via {@code getPhase() + Integer.MIN_VALUE}. 804 * 805 * @return the phase number, or a negative value if terminated 806 */ getPhase()807 public final int getPhase() { 808 return (int)(root.state >>> PHASE_SHIFT); 809 } 810 811 /** 812 * Returns the number of parties registered at this phaser. 813 * 814 * @return the number of parties 815 */ getRegisteredParties()816 public int getRegisteredParties() { 817 return partiesOf(state); 818 } 819 820 /** 821 * Returns the number of registered parties that have arrived at 822 * the current phase of this phaser. If this phaser has terminated, 823 * the returned value is meaningless and arbitrary. 824 * 825 * @return the number of arrived parties 826 */ getArrivedParties()827 public int getArrivedParties() { 828 return arrivedOf(reconcileState()); 829 } 830 831 /** 832 * Returns the number of registered parties that have not yet 833 * arrived at the current phase of this phaser. If this phaser has 834 * terminated, the returned value is meaningless and arbitrary. 835 * 836 * @return the number of unarrived parties 837 */ getUnarrivedParties()838 public int getUnarrivedParties() { 839 return unarrivedOf(reconcileState()); 840 } 841 842 /** 843 * Returns the parent of this phaser, or {@code null} if none. 844 * 845 * @return the parent of this phaser, or {@code null} if none 846 */ getParent()847 public Phaser getParent() { 848 return parent; 849 } 850 851 /** 852 * Returns the root ancestor of this phaser, which is the same as 853 * this phaser if it has no parent. 854 * 855 * @return the root ancestor of this phaser 856 */ getRoot()857 public Phaser getRoot() { 858 return root; 859 } 860 861 /** 862 * Returns {@code true} if this phaser has been terminated. 863 * 864 * @return {@code true} if this phaser has been terminated 865 */ isTerminated()866 public boolean isTerminated() { 867 return root.state < 0L; 868 } 869 870 /** 871 * Overridable method to perform an action upon impending phase 872 * advance, and to control termination. This method is invoked 873 * upon arrival of the party advancing this phaser (when all other 874 * waiting parties are dormant). If this method returns {@code 875 * true}, this phaser will be set to a final termination state 876 * upon advance, and subsequent calls to {@link #isTerminated} 877 * will return true. Any (unchecked) Exception or Error thrown by 878 * an invocation of this method is propagated to the party 879 * attempting to advance this phaser, in which case no advance 880 * occurs. 881 * 882 * <p>The arguments to this method provide the state of the phaser 883 * prevailing for the current transition. The effects of invoking 884 * arrival, registration, and waiting methods on this phaser from 885 * within {@code onAdvance} are unspecified and should not be 886 * relied on. 887 * 888 * <p>If this phaser is a member of a tiered set of phasers, then 889 * {@code onAdvance} is invoked only for its root phaser on each 890 * advance. 891 * 892 * <p>To support the most common use cases, the default 893 * implementation of this method returns {@code true} when the 894 * number of registered parties has become zero as the result of a 895 * party invoking {@code arriveAndDeregister}. You can disable 896 * this behavior, thus enabling continuation upon future 897 * registrations, by overriding this method to always return 898 * {@code false}: 899 * 900 * <pre> {@code 901 * Phaser phaser = new Phaser() { 902 * protected boolean onAdvance(int phase, int parties) { return false; } 903 * }}</pre> 904 * 905 * @param phase the current phase number on entry to this method, 906 * before this phaser is advanced 907 * @param registeredParties the current number of registered parties 908 * @return {@code true} if this phaser should terminate 909 */ onAdvance(int phase, int registeredParties)910 protected boolean onAdvance(int phase, int registeredParties) { 911 return registeredParties == 0; 912 } 913 914 /** 915 * Returns a string identifying this phaser, as well as its 916 * state. The state, in brackets, includes the String {@code 917 * "phase = "} followed by the phase number, {@code "parties = "} 918 * followed by the number of registered parties, and {@code 919 * "arrived = "} followed by the number of arrived parties. 920 * 921 * @return a string identifying this phaser, as well as its state 922 */ toString()923 public String toString() { 924 return stateToString(reconcileState()); 925 } 926 927 /** 928 * Implementation of toString and string-based error messages 929 */ stateToString(long s)930 private String stateToString(long s) { 931 return super.toString() + 932 "[phase = " + phaseOf(s) + 933 " parties = " + partiesOf(s) + 934 " arrived = " + arrivedOf(s) + "]"; 935 } 936 937 // Waiting mechanics 938 939 /** 940 * Removes and signals threads from queue for phase. 941 */ releaseWaiters(int phase)942 private void releaseWaiters(int phase) { 943 QNode q; // first element of queue 944 Thread t; // its thread 945 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; 946 while ((q = head.get()) != null && 947 q.phase != (int)(root.state >>> PHASE_SHIFT)) { 948 if (head.compareAndSet(q, q.next) && 949 (t = q.thread) != null) { 950 q.thread = null; 951 LockSupport.unpark(t); 952 } 953 } 954 } 955 956 /** 957 * Variant of releaseWaiters that additionally tries to remove any 958 * nodes no longer waiting for advance due to timeout or 959 * interrupt. Currently, nodes are removed only if they are at 960 * head of queue, which suffices to reduce memory footprint in 961 * most usages. 962 * 963 * @return current phase on exit 964 */ abortWait(int phase)965 private int abortWait(int phase) { 966 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; 967 for (;;) { 968 Thread t; 969 QNode q = head.get(); 970 int p = (int)(root.state >>> PHASE_SHIFT); 971 if (q == null || ((t = q.thread) != null && q.phase == p)) 972 return p; 973 if (head.compareAndSet(q, q.next) && t != null) { 974 q.thread = null; 975 LockSupport.unpark(t); 976 } 977 } 978 } 979 980 /** The number of CPUs, for spin control */ 981 private static final int NCPU = Runtime.getRuntime().availableProcessors(); 982 983 /** 984 * The number of times to spin before blocking while waiting for 985 * advance, per arrival while waiting. On multiprocessors, fully 986 * blocking and waking up a large number of threads all at once is 987 * usually a very slow process, so we use rechargeable spins to 988 * avoid it when threads regularly arrive: When a thread in 989 * internalAwaitAdvance notices another arrival before blocking, 990 * and there appear to be enough CPUs available, it spins 991 * SPINS_PER_ARRIVAL more times before blocking. The value trades 992 * off good-citizenship vs big unnecessary slowdowns. 993 */ 994 static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8; 995 996 /** 997 * Possibly blocks and waits for phase to advance unless aborted. 998 * Call only on root phaser. 999 * 1000 * @param phase current phase 1001 * @param node if non-null, the wait node to track interrupt and timeout; 1002 * if null, denotes noninterruptible wait 1003 * @return current phase 1004 */ internalAwaitAdvance(int phase, QNode node)1005 private int internalAwaitAdvance(int phase, QNode node) { 1006 // assert root == this; 1007 releaseWaiters(phase-1); // ensure old queue clean 1008 boolean queued = false; // true when node is enqueued 1009 int lastUnarrived = 0; // to increase spins upon change 1010 int spins = SPINS_PER_ARRIVAL; 1011 long s; 1012 int p; 1013 while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { 1014 if (node == null) { // spinning in noninterruptible mode 1015 int unarrived = (int)s & UNARRIVED_MASK; 1016 if (unarrived != lastUnarrived && 1017 (lastUnarrived = unarrived) < NCPU) 1018 spins += SPINS_PER_ARRIVAL; 1019 boolean interrupted = Thread.interrupted(); 1020 if (interrupted || --spins < 0) { // need node to record intr 1021 node = new QNode(this, phase, false, false, 0L); 1022 node.wasInterrupted = interrupted; 1023 } 1024 } 1025 else if (node.isReleasable()) // done or aborted 1026 break; 1027 else if (!queued) { // push onto queue 1028 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; 1029 QNode q = node.next = head.get(); 1030 if ((q == null || q.phase == phase) && 1031 (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq 1032 queued = head.compareAndSet(q, node); 1033 } 1034 else { 1035 try { 1036 ForkJoinPool.managedBlock(node); 1037 } catch (InterruptedException ie) { 1038 node.wasInterrupted = true; 1039 } 1040 } 1041 } 1042 1043 if (node != null) { 1044 if (node.thread != null) 1045 node.thread = null; // avoid need for unpark() 1046 if (node.wasInterrupted && !node.interruptible) 1047 Thread.currentThread().interrupt(); 1048 if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase) 1049 return abortWait(phase); // possibly clean up on abort 1050 } 1051 releaseWaiters(phase); 1052 return p; 1053 } 1054 1055 /** 1056 * Wait nodes for Treiber stack representing wait queue 1057 */ 1058 static final class QNode implements ForkJoinPool.ManagedBlocker { 1059 final Phaser phaser; 1060 final int phase; 1061 final boolean interruptible; 1062 final boolean timed; 1063 boolean wasInterrupted; 1064 long nanos; 1065 final long deadline; 1066 volatile Thread thread; // nulled to cancel wait 1067 QNode next; 1068 QNode(Phaser phaser, int phase, boolean interruptible, boolean timed, long nanos)1069 QNode(Phaser phaser, int phase, boolean interruptible, 1070 boolean timed, long nanos) { 1071 this.phaser = phaser; 1072 this.phase = phase; 1073 this.interruptible = interruptible; 1074 this.nanos = nanos; 1075 this.timed = timed; 1076 this.deadline = timed ? System.nanoTime() + nanos : 0L; 1077 thread = Thread.currentThread(); 1078 } 1079 isReleasable()1080 public boolean isReleasable() { 1081 if (thread == null) 1082 return true; 1083 if (phaser.getPhase() != phase) { 1084 thread = null; 1085 return true; 1086 } 1087 if (Thread.interrupted()) 1088 wasInterrupted = true; 1089 if (wasInterrupted && interruptible) { 1090 thread = null; 1091 return true; 1092 } 1093 if (timed) { 1094 if (nanos > 0L) { 1095 nanos = deadline - System.nanoTime(); 1096 } 1097 if (nanos <= 0L) { 1098 thread = null; 1099 return true; 1100 } 1101 } 1102 return false; 1103 } 1104 block()1105 public boolean block() { 1106 if (isReleasable()) 1107 return true; 1108 else if (!timed) 1109 LockSupport.park(this); 1110 else if (nanos > 0L) 1111 LockSupport.parkNanos(this, nanos); 1112 return isReleasable(); 1113 } 1114 } 1115 1116 // Unsafe mechanics 1117 1118 private static final sun.misc.Unsafe UNSAFE; 1119 private static final long stateOffset; 1120 static { 1121 try { 1122 UNSAFE = sun.misc.Unsafe.getUnsafe(); 1123 Class<?> k = Phaser.class; 1124 stateOffset = UNSAFE.objectFieldOffset 1125 (k.getDeclaredField("state")); 1126 } catch (Exception e) { 1127 throw new Error(e); 1128 } 1129 1130 // Reduce the risk of rare disastrous classloading in first call to 1131 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 1132 Class<?> ensureLoaded = LockSupport.class; 1133 } 1134 } 1135