• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent;
8 
9 import java.util.concurrent.atomic.AtomicReference;
10 import java.util.concurrent.locks.LockSupport;
11 
12 /**
13  * A reusable synchronization barrier, similar in functionality to
14  * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
15  * {@link java.util.concurrent.CountDownLatch CountDownLatch}
16  * but supporting more flexible usage.
17  *
18  * <p><b>Registration.</b> Unlike the case for other barriers, the
19  * number of parties <em>registered</em> to synchronize on a phaser
20  * may vary over time.  Tasks may be registered at any time (using
21  * methods {@link #register}, {@link #bulkRegister}, or forms of
22  * constructors establishing initial numbers of parties), and
23  * optionally deregistered upon any arrival (using {@link
24  * #arriveAndDeregister}).  As is the case with most basic
25  * synchronization constructs, registration and deregistration affect
26  * only internal counts; they do not establish any further internal
27  * bookkeeping, so tasks cannot query whether they are registered.
28  * (However, you can introduce such bookkeeping by subclassing this
29  * class.)
30  *
31  * <p><b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
32  * Phaser} may be repeatedly awaited.  Method {@link
33  * #arriveAndAwaitAdvance} has effect analogous to {@link
34  * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
35  * generation of a phaser has an associated phase number. The phase
36  * number starts at zero, and advances when all parties arrive at the
37  * phaser, wrapping around to zero after reaching {@code
38  * Integer.MAX_VALUE}. The use of phase numbers enables independent
39  * control of actions upon arrival at a phaser and upon awaiting
40  * others, via two kinds of methods that may be invoked by any
41  * registered party:
42  *
43  * <ul>
44  *
45  *   <li> <b>Arrival.</b> Methods {@link #arrive} and
46  *       {@link #arriveAndDeregister} record arrival.  These methods
47  *       do not block, but return an associated <em>arrival phase
48  *       number</em>; that is, the phase number of the phaser to which
49  *       the arrival applied. When the final party for a given phase
50  *       arrives, an optional action is performed and the phase
51  *       advances.  These actions are performed by the party
52  *       triggering a phase advance, and are arranged by overriding
53  *       method {@link #onAdvance(int, int)}, which also controls
54  *       termination. Overriding this method is similar to, but more
55  *       flexible than, providing a barrier action to a {@code
56  *       CyclicBarrier}.
57  *
58  *   <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an
59  *       argument indicating an arrival phase number, and returns when
60  *       the phaser advances to (or is already at) a different phase.
61  *       Unlike similar constructions using {@code CyclicBarrier},
62  *       method {@code awaitAdvance} continues to wait even if the
63  *       waiting thread is interrupted. Interruptible and timeout
64  *       versions are also available, but exceptions encountered while
65  *       tasks wait interruptibly or with timeout do not change the
66  *       state of the phaser. If necessary, you can perform any
67  *       associated recovery within handlers of those exceptions,
68  *       often after invoking {@code forceTermination}.  Phasers may
69  *       also be used by tasks executing in a {@link ForkJoinPool},
70  *       which will ensure sufficient parallelism to execute tasks
71  *       when others are blocked waiting for a phase to advance.
72  *
73  * </ul>
74  *
75  * <p><b>Termination.</b> A phaser may enter a <em>termination</em>
76  * state, that may be checked using method {@link #isTerminated}. Upon
77  * termination, all synchronization methods immediately return without
78  * waiting for advance, as indicated by a negative return value.
79  * Similarly, attempts to register upon termination have no effect.
80  * Termination is triggered when an invocation of {@code onAdvance}
81  * returns {@code true}. The default implementation returns {@code
82  * true} if a deregistration has caused the number of registered
83  * parties to become zero.  As illustrated below, when phasers control
84  * actions with a fixed number of iterations, it is often convenient
85  * to override this method to cause termination when the current phase
86  * number reaches a threshold. Method {@link #forceTermination} is
87  * also available to abruptly release waiting threads and allow them
88  * to terminate.
89  *
90  * <p><b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
91  * constructed in tree structures) to reduce contention. Phasers with
92  * large numbers of parties that would otherwise experience heavy
93  * synchronization contention costs may instead be set up so that
94  * groups of sub-phasers share a common parent.  This may greatly
95  * increase throughput even though it incurs greater per-operation
96  * overhead.
97  *
98  * <p>In a tree of tiered phasers, registration and deregistration of
99  * child phasers with their parent are managed automatically.
100  * Whenever the number of registered parties of a child phaser becomes
101  * non-zero (as established in the {@link #Phaser(Phaser,int)}
102  * constructor, {@link #register}, or {@link #bulkRegister}), the
103  * child phaser is registered with its parent.  Whenever the number of
104  * registered parties becomes zero as the result of an invocation of
105  * {@link #arriveAndDeregister}, the child phaser is deregistered
106  * from its parent.
107  *
108  * <p><b>Monitoring.</b> While synchronization methods may be invoked
109  * only by registered parties, the current state of a phaser may be
110  * monitored by any caller.  At any given moment there are {@link
111  * #getRegisteredParties} parties in total, of which {@link
112  * #getArrivedParties} have arrived at the current phase ({@link
113  * #getPhase}).  When the remaining ({@link #getUnarrivedParties})
114  * parties arrive, the phase advances.  The values returned by these
115  * methods may reflect transient states and so are not in general
116  * useful for synchronization control.  Method {@link #toString}
117  * returns snapshots of these state queries in a form convenient for
118  * informal monitoring.
119  *
120  * <p><b>Sample usages:</b>
121  *
122  * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch}
123  * to control a one-shot action serving a variable number of parties.
124  * The typical idiom is for the method setting this up to first
125  * register, then start the actions, then deregister, as in:
126  *
127  *  <pre> {@code
128  * void runTasks(List<Runnable> tasks) {
129  *   final Phaser phaser = new Phaser(1); // "1" to register self
130  *   // create and start threads
131  *   for (final Runnable task : tasks) {
132  *     phaser.register();
133  *     new Thread() {
134  *       public void run() {
135  *         phaser.arriveAndAwaitAdvance(); // await all creation
136  *         task.run();
137  *       }
138  *     }.start();
139  *   }
140  *
141  *   // allow threads to start and deregister self
142  *   phaser.arriveAndDeregister();
143  * }}</pre>
144  *
145  * <p>One way to cause a set of threads to repeatedly perform actions
146  * for a given number of iterations is to override {@code onAdvance}:
147  *
148  *  <pre> {@code
149  * void startTasks(List<Runnable> tasks, final int iterations) {
150  *   final Phaser phaser = new Phaser() {
151  *     protected boolean onAdvance(int phase, int registeredParties) {
152  *       return phase >= iterations || registeredParties == 0;
153  *     }
154  *   };
155  *   phaser.register();
156  *   for (final Runnable task : tasks) {
157  *     phaser.register();
158  *     new Thread() {
159  *       public void run() {
160  *         do {
161  *           task.run();
162  *           phaser.arriveAndAwaitAdvance();
163  *         } while (!phaser.isTerminated());
164  *       }
165  *     }.start();
166  *   }
167  *   phaser.arriveAndDeregister(); // deregister self, don't wait
168  * }}</pre>
169  *
170  * If the main task must later await termination, it
171  * may re-register and then execute a similar loop:
172  *  <pre> {@code
173  *   // ...
174  *   phaser.register();
175  *   while (!phaser.isTerminated())
176  *     phaser.arriveAndAwaitAdvance();}</pre>
177  *
178  * <p>Related constructions may be used to await particular phase numbers
179  * in contexts where you are sure that the phase will never wrap around
180  * {@code Integer.MAX_VALUE}. For example:
181  *
182  *  <pre> {@code
183  * void awaitPhase(Phaser phaser, int phase) {
184  *   int p = phaser.register(); // assumes caller not already registered
185  *   while (p < phase) {
186  *     if (phaser.isTerminated())
187  *       // ... deal with unexpected termination
188  *     else
189  *       p = phaser.arriveAndAwaitAdvance();
190  *   }
191  *   phaser.arriveAndDeregister();
192  * }}</pre>
193  *
194  *
195  * <p>To create a set of {@code n} tasks using a tree of phasers, you
196  * could use code of the following form, assuming a Task class with a
197  * constructor accepting a {@code Phaser} that it registers with upon
198  * construction. After invocation of {@code build(new Task[n], 0, n,
199  * new Phaser())}, these tasks could then be started, for example by
200  * submitting to a pool:
201  *
202  *  <pre> {@code
203  * void build(Task[] tasks, int lo, int hi, Phaser ph) {
204  *   if (hi - lo > TASKS_PER_PHASER) {
205  *     for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
206  *       int j = Math.min(i + TASKS_PER_PHASER, hi);
207  *       build(tasks, i, j, new Phaser(ph));
208  *     }
209  *   } else {
210  *     for (int i = lo; i < hi; ++i)
211  *       tasks[i] = new Task(ph);
212  *       // assumes new Task(ph) performs ph.register()
213  *   }
214  * }}</pre>
215  *
216  * The best value of {@code TASKS_PER_PHASER} depends mainly on
217  * expected synchronization rates. A value as low as four may
218  * be appropriate for extremely small per-phase task bodies (thus
219  * high rates), or up to hundreds for extremely large ones.
220  *
221  * <p><b>Implementation notes</b>: This implementation restricts the
222  * maximum number of parties to 65535. Attempts to register additional
223  * parties result in {@code IllegalStateException}. However, you can and
224  * should create tiered phasers to accommodate arbitrarily large sets
225  * of participants.
226  *
227  * @since 1.7
228  * @author Doug Lea
229  */
230 public class Phaser {
231     /*
232      * This class implements an extension of X10 "clocks".  Thanks to
233      * Vijay Saraswat for the idea, and to Vivek Sarkar for
234      * enhancements to extend functionality.
235      */
236 
237     /**
238      * Primary state representation, holding four bit-fields:
239      *
240      * unarrived  -- the number of parties yet to hit barrier (bits  0-15)
241      * parties    -- the number of parties to wait            (bits 16-31)
242      * phase      -- the generation of the barrier            (bits 32-62)
243      * terminated -- set if barrier is terminated             (bit  63 / sign)
244      *
245      * Except that a phaser with no registered parties is
246      * distinguished by the otherwise illegal state of having zero
247      * parties and one unarrived parties (encoded as EMPTY below).
248      *
249      * To efficiently maintain atomicity, these values are packed into
250      * a single (atomic) long. Good performance relies on keeping
251      * state decoding and encoding simple, and keeping race windows
252      * short.
253      *
254      * All state updates are performed via CAS except initial
255      * registration of a sub-phaser (i.e., one with a non-null
256      * parent).  In this (relatively rare) case, we use built-in
257      * synchronization to lock while first registering with its
258      * parent.
259      *
260      * The phase of a subphaser is allowed to lag that of its
261      * ancestors until it is actually accessed -- see method
262      * reconcileState.
263      */
264     private volatile long state;
265 
266     private static final int  MAX_PARTIES     = 0xffff;
267     private static final int  MAX_PHASE       = Integer.MAX_VALUE;
268     private static final int  PARTIES_SHIFT   = 16;
269     private static final int  PHASE_SHIFT     = 32;
270     private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
271     private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
272     private static final long COUNTS_MASK     = 0xffffffffL;
273     private static final long TERMINATION_BIT = 1L << 63;
274 
275     // some special values
276     private static final int  ONE_ARRIVAL     = 1;
277     private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
278     private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;
279     private static final int  EMPTY           = 1;
280 
281     // The following unpacking methods are usually manually inlined
282 
unarrivedOf(long s)283     private static int unarrivedOf(long s) {
284         int counts = (int)s;
285         return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
286     }
287 
partiesOf(long s)288     private static int partiesOf(long s) {
289         return (int)s >>> PARTIES_SHIFT;
290     }
291 
phaseOf(long s)292     private static int phaseOf(long s) {
293         return (int)(s >>> PHASE_SHIFT);
294     }
295 
arrivedOf(long s)296     private static int arrivedOf(long s) {
297         int counts = (int)s;
298         return (counts == EMPTY) ? 0 :
299             (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
300     }
301 
302     /**
303      * The parent of this phaser, or null if none
304      */
305     private final Phaser parent;
306 
307     /**
308      * The root of phaser tree. Equals this if not in a tree.
309      */
310     private final Phaser root;
311 
312     /**
313      * Heads of Treiber stacks for waiting threads. To eliminate
314      * contention when releasing some threads while adding others, we
315      * use two of them, alternating across even and odd phases.
316      * Subphasers share queues with root to speed up releases.
317      */
318     private final AtomicReference<QNode> evenQ;
319     private final AtomicReference<QNode> oddQ;
320 
queueFor(int phase)321     private AtomicReference<QNode> queueFor(int phase) {
322         return ((phase & 1) == 0) ? evenQ : oddQ;
323     }
324 
325     /**
326      * Returns message string for bounds exceptions on arrival.
327      */
badArrive(long s)328     private String badArrive(long s) {
329         return "Attempted arrival of unregistered party for " +
330             stateToString(s);
331     }
332 
333     /**
334      * Returns message string for bounds exceptions on registration.
335      */
badRegister(long s)336     private String badRegister(long s) {
337         return "Attempt to register more than " +
338             MAX_PARTIES + " parties for " + stateToString(s);
339     }
340 
341     /**
342      * Main implementation for methods arrive and arriveAndDeregister.
343      * Manually tuned to speed up and minimize race windows for the
344      * common case of just decrementing unarrived field.
345      *
346      * @param adjust value to subtract from state;
347      *               ONE_ARRIVAL for arrive,
348      *               ONE_DEREGISTER for arriveAndDeregister
349      */
doArrive(int adjust)350     private int doArrive(int adjust) {
351         final Phaser root = this.root;
352         for (;;) {
353             long s = (root == this) ? state : reconcileState();
354             int phase = (int)(s >>> PHASE_SHIFT);
355             if (phase < 0)
356                 return phase;
357             int counts = (int)s;
358             int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
359             if (unarrived <= 0)
360                 throw new IllegalStateException(badArrive(s));
361             if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
362                 if (unarrived == 1) {
363                     long n = s & PARTIES_MASK;  // base of next state
364                     int nextUnarrived = (int)n >>> PARTIES_SHIFT;
365                     if (root == this) {
366                         if (onAdvance(phase, nextUnarrived))
367                             n |= TERMINATION_BIT;
368                         else if (nextUnarrived == 0)
369                             n |= EMPTY;
370                         else
371                             n |= nextUnarrived;
372                         int nextPhase = (phase + 1) & MAX_PHASE;
373                         n |= (long)nextPhase << PHASE_SHIFT;
374                         UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
375                         releaseWaiters(phase);
376                     }
377                     else if (nextUnarrived == 0) { // propagate deregistration
378                         phase = parent.doArrive(ONE_DEREGISTER);
379                         UNSAFE.compareAndSwapLong(this, stateOffset,
380                                                   s, s | EMPTY);
381                     }
382                     else
383                         phase = parent.doArrive(ONE_ARRIVAL);
384                 }
385                 return phase;
386             }
387         }
388     }
389 
390     /**
391      * Implementation of register, bulkRegister
392      *
393      * @param registrations number to add to both parties and
394      * unarrived fields. Must be greater than zero.
395      */
doRegister(int registrations)396     private int doRegister(int registrations) {
397         // adjustment to state
398         long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
399         final Phaser parent = this.parent;
400         int phase;
401         for (;;) {
402             long s = (parent == null) ? state : reconcileState();
403             int counts = (int)s;
404             int parties = counts >>> PARTIES_SHIFT;
405             int unarrived = counts & UNARRIVED_MASK;
406             if (registrations > MAX_PARTIES - parties)
407                 throw new IllegalStateException(badRegister(s));
408             phase = (int)(s >>> PHASE_SHIFT);
409             if (phase < 0)
410                 break;
411             if (counts != EMPTY) {                  // not 1st registration
412                 if (parent == null || reconcileState() == s) {
413                     if (unarrived == 0)             // wait out advance
414                         root.internalAwaitAdvance(phase, null);
415                     else if (UNSAFE.compareAndSwapLong(this, stateOffset,
416                                                        s, s + adjust))
417                         break;
418                 }
419             }
420             else if (parent == null) {              // 1st root registration
421                 long next = ((long)phase << PHASE_SHIFT) | adjust;
422                 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
423                     break;
424             }
425             else {
426                 synchronized (this) {               // 1st sub registration
427                     if (state == s) {               // recheck under lock
428                         phase = parent.doRegister(1);
429                         if (phase < 0)
430                             break;
431                         // finish registration whenever parent registration
432                         // succeeded, even when racing with termination,
433                         // since these are part of the same "transaction".
434                         while (!UNSAFE.compareAndSwapLong
435                                (this, stateOffset, s,
436                                 ((long)phase << PHASE_SHIFT) | adjust)) {
437                             s = state;
438                             phase = (int)(root.state >>> PHASE_SHIFT);
439                             // assert (int)s == EMPTY;
440                         }
441                         break;
442                     }
443                 }
444             }
445         }
446         return phase;
447     }
448 
449     /**
450      * Resolves lagged phase propagation from root if necessary.
451      * Reconciliation normally occurs when root has advanced but
452      * subphasers have not yet done so, in which case they must finish
453      * their own advance by setting unarrived to parties (or if
454      * parties is zero, resetting to unregistered EMPTY state).
455      *
456      * @return reconciled state
457      */
reconcileState()458     private long reconcileState() {
459         final Phaser root = this.root;
460         long s = state;
461         if (root != this) {
462             int phase, p;
463             // CAS to root phase with current parties, tripping unarrived
464             while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
465                    (int)(s >>> PHASE_SHIFT) &&
466                    !UNSAFE.compareAndSwapLong
467                    (this, stateOffset, s,
468                     s = (((long)phase << PHASE_SHIFT) |
469                          ((phase < 0) ? (s & COUNTS_MASK) :
470                           (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
471                            ((s & PARTIES_MASK) | p))))))
472                 s = state;
473         }
474         return s;
475     }
476 
477     /**
478      * Creates a new phaser with no initially registered parties, no
479      * parent, and initial phase number 0. Any thread using this
480      * phaser will need to first register for it.
481      */
Phaser()482     public Phaser() {
483         this(null, 0);
484     }
485 
486     /**
487      * Creates a new phaser with the given number of registered
488      * unarrived parties, no parent, and initial phase number 0.
489      *
490      * @param parties the number of parties required to advance to the
491      * next phase
492      * @throws IllegalArgumentException if parties less than zero
493      * or greater than the maximum number of parties supported
494      */
Phaser(int parties)495     public Phaser(int parties) {
496         this(null, parties);
497     }
498 
499     /**
500      * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
501      *
502      * @param parent the parent phaser
503      */
Phaser(Phaser parent)504     public Phaser(Phaser parent) {
505         this(parent, 0);
506     }
507 
508     /**
509      * Creates a new phaser with the given parent and number of
510      * registered unarrived parties.  When the given parent is non-null
511      * and the given number of parties is greater than zero, this
512      * child phaser is registered with its parent.
513      *
514      * @param parent the parent phaser
515      * @param parties the number of parties required to advance to the
516      * next phase
517      * @throws IllegalArgumentException if parties less than zero
518      * or greater than the maximum number of parties supported
519      */
Phaser(Phaser parent, int parties)520     public Phaser(Phaser parent, int parties) {
521         if (parties >>> PARTIES_SHIFT != 0)
522             throw new IllegalArgumentException("Illegal number of parties");
523         int phase = 0;
524         this.parent = parent;
525         if (parent != null) {
526             final Phaser root = parent.root;
527             this.root = root;
528             this.evenQ = root.evenQ;
529             this.oddQ = root.oddQ;
530             if (parties != 0)
531                 phase = parent.doRegister(1);
532         }
533         else {
534             this.root = this;
535             this.evenQ = new AtomicReference<QNode>();
536             this.oddQ = new AtomicReference<QNode>();
537         }
538         this.state = (parties == 0) ? (long)EMPTY :
539             ((long)phase << PHASE_SHIFT) |
540             ((long)parties << PARTIES_SHIFT) |
541             ((long)parties);
542     }
543 
544     /**
545      * Adds a new unarrived party to this phaser.  If an ongoing
546      * invocation of {@link #onAdvance} is in progress, this method
547      * may await its completion before returning.  If this phaser has
548      * a parent, and this phaser previously had no registered parties,
549      * this child phaser is also registered with its parent. If
550      * this phaser is terminated, the attempt to register has
551      * no effect, and a negative value is returned.
552      *
553      * @return the arrival phase number to which this registration
554      * applied.  If this value is negative, then this phaser has
555      * terminated, in which case registration has no effect.
556      * @throws IllegalStateException if attempting to register more
557      * than the maximum supported number of parties
558      */
register()559     public int register() {
560         return doRegister(1);
561     }
562 
563     /**
564      * Adds the given number of new unarrived parties to this phaser.
565      * If an ongoing invocation of {@link #onAdvance} is in progress,
566      * this method may await its completion before returning.  If this
567      * phaser has a parent, and the given number of parties is greater
568      * than zero, and this phaser previously had no registered
569      * parties, this child phaser is also registered with its parent.
570      * If this phaser is terminated, the attempt to register has no
571      * effect, and a negative value is returned.
572      *
573      * @param parties the number of additional parties required to
574      * advance to the next phase
575      * @return the arrival phase number to which this registration
576      * applied.  If this value is negative, then this phaser has
577      * terminated, in which case registration has no effect.
578      * @throws IllegalStateException if attempting to register more
579      * than the maximum supported number of parties
580      * @throws IllegalArgumentException if {@code parties < 0}
581      */
bulkRegister(int parties)582     public int bulkRegister(int parties) {
583         if (parties < 0)
584             throw new IllegalArgumentException();
585         if (parties == 0)
586             return getPhase();
587         return doRegister(parties);
588     }
589 
590     /**
591      * Arrives at this phaser, without waiting for others to arrive.
592      *
593      * <p>It is a usage error for an unregistered party to invoke this
594      * method.  However, this error may result in an {@code
595      * IllegalStateException} only upon some subsequent operation on
596      * this phaser, if ever.
597      *
598      * @return the arrival phase number, or a negative value if terminated
599      * @throws IllegalStateException if not terminated and the number
600      * of unarrived parties would become negative
601      */
arrive()602     public int arrive() {
603         return doArrive(ONE_ARRIVAL);
604     }
605 
606     /**
607      * Arrives at this phaser and deregisters from it without waiting
608      * for others to arrive. Deregistration reduces the number of
609      * parties required to advance in future phases.  If this phaser
610      * has a parent, and deregistration causes this phaser to have
611      * zero parties, this phaser is also deregistered from its parent.
612      *
613      * <p>It is a usage error for an unregistered party to invoke this
614      * method.  However, this error may result in an {@code
615      * IllegalStateException} only upon some subsequent operation on
616      * this phaser, if ever.
617      *
618      * @return the arrival phase number, or a negative value if terminated
619      * @throws IllegalStateException if not terminated and the number
620      * of registered or unarrived parties would become negative
621      */
arriveAndDeregister()622     public int arriveAndDeregister() {
623         return doArrive(ONE_DEREGISTER);
624     }
625 
626     /**
627      * Arrives at this phaser and awaits others. Equivalent in effect
628      * to {@code awaitAdvance(arrive())}.  If you need to await with
629      * interruption or timeout, you can arrange this with an analogous
630      * construction using one of the other forms of the {@code
631      * awaitAdvance} method.  If instead you need to deregister upon
632      * arrival, use {@code awaitAdvance(arriveAndDeregister())}.
633      *
634      * <p>It is a usage error for an unregistered party to invoke this
635      * method.  However, this error may result in an {@code
636      * IllegalStateException} only upon some subsequent operation on
637      * this phaser, if ever.
638      *
639      * @return the arrival phase number, or the (negative)
640      * {@linkplain #getPhase() current phase} if terminated
641      * @throws IllegalStateException if not terminated and the number
642      * of unarrived parties would become negative
643      */
arriveAndAwaitAdvance()644     public int arriveAndAwaitAdvance() {
645         // Specialization of doArrive+awaitAdvance eliminating some reads/paths
646         final Phaser root = this.root;
647         for (;;) {
648             long s = (root == this) ? state : reconcileState();
649             int phase = (int)(s >>> PHASE_SHIFT);
650             if (phase < 0)
651                 return phase;
652             int counts = (int)s;
653             int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
654             if (unarrived <= 0)
655                 throw new IllegalStateException(badArrive(s));
656             if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
657                                           s -= ONE_ARRIVAL)) {
658                 if (unarrived > 1)
659                     return root.internalAwaitAdvance(phase, null);
660                 if (root != this)
661                     return parent.arriveAndAwaitAdvance();
662                 long n = s & PARTIES_MASK;  // base of next state
663                 int nextUnarrived = (int)n >>> PARTIES_SHIFT;
664                 if (onAdvance(phase, nextUnarrived))
665                     n |= TERMINATION_BIT;
666                 else if (nextUnarrived == 0)
667                     n |= EMPTY;
668                 else
669                     n |= nextUnarrived;
670                 int nextPhase = (phase + 1) & MAX_PHASE;
671                 n |= (long)nextPhase << PHASE_SHIFT;
672                 if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
673                     return (int)(state >>> PHASE_SHIFT); // terminated
674                 releaseWaiters(phase);
675                 return nextPhase;
676             }
677         }
678     }
679 
680     /**
681      * Awaits the phase of this phaser to advance from the given phase
682      * value, returning immediately if the current phase is not equal
683      * to the given phase value or this phaser is terminated.
684      *
685      * @param phase an arrival phase number, or negative value if
686      * terminated; this argument is normally the value returned by a
687      * previous call to {@code arrive} or {@code arriveAndDeregister}.
688      * @return the next arrival phase number, or the argument if it is
689      * negative, or the (negative) {@linkplain #getPhase() current phase}
690      * if terminated
691      */
awaitAdvance(int phase)692     public int awaitAdvance(int phase) {
693         final Phaser root = this.root;
694         long s = (root == this) ? state : reconcileState();
695         int p = (int)(s >>> PHASE_SHIFT);
696         if (phase < 0)
697             return phase;
698         if (p == phase)
699             return root.internalAwaitAdvance(phase, null);
700         return p;
701     }
702 
703     /**
704      * Awaits the phase of this phaser to advance from the given phase
705      * value, throwing {@code InterruptedException} if interrupted
706      * while waiting, or returning immediately if the current phase is
707      * not equal to the given phase value or this phaser is
708      * terminated.
709      *
710      * @param phase an arrival phase number, or negative value if
711      * terminated; this argument is normally the value returned by a
712      * previous call to {@code arrive} or {@code arriveAndDeregister}.
713      * @return the next arrival phase number, or the argument if it is
714      * negative, or the (negative) {@linkplain #getPhase() current phase}
715      * if terminated
716      * @throws InterruptedException if thread interrupted while waiting
717      */
awaitAdvanceInterruptibly(int phase)718     public int awaitAdvanceInterruptibly(int phase)
719         throws InterruptedException {
720         final Phaser root = this.root;
721         long s = (root == this) ? state : reconcileState();
722         int p = (int)(s >>> PHASE_SHIFT);
723         if (phase < 0)
724             return phase;
725         if (p == phase) {
726             QNode node = new QNode(this, phase, true, false, 0L);
727             p = root.internalAwaitAdvance(phase, node);
728             if (node.wasInterrupted)
729                 throw new InterruptedException();
730         }
731         return p;
732     }
733 
734     /**
735      * Awaits the phase of this phaser to advance from the given phase
736      * value or the given timeout to elapse, throwing {@code
737      * InterruptedException} if interrupted while waiting, or
738      * returning immediately if the current phase is not equal to the
739      * given phase value or this phaser is terminated.
740      *
741      * @param phase an arrival phase number, or negative value if
742      * terminated; this argument is normally the value returned by a
743      * previous call to {@code arrive} or {@code arriveAndDeregister}.
744      * @param timeout how long to wait before giving up, in units of
745      *        {@code unit}
746      * @param unit a {@code TimeUnit} determining how to interpret the
747      *        {@code timeout} parameter
748      * @return the next arrival phase number, or the argument if it is
749      * negative, or the (negative) {@linkplain #getPhase() current phase}
750      * if terminated
751      * @throws InterruptedException if thread interrupted while waiting
752      * @throws TimeoutException if timed out while waiting
753      */
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)754     public int awaitAdvanceInterruptibly(int phase,
755                                          long timeout, TimeUnit unit)
756         throws InterruptedException, TimeoutException {
757         long nanos = unit.toNanos(timeout);
758         final Phaser root = this.root;
759         long s = (root == this) ? state : reconcileState();
760         int p = (int)(s >>> PHASE_SHIFT);
761         if (phase < 0)
762             return phase;
763         if (p == phase) {
764             QNode node = new QNode(this, phase, true, true, nanos);
765             p = root.internalAwaitAdvance(phase, node);
766             if (node.wasInterrupted)
767                 throw new InterruptedException();
768             else if (p == phase)
769                 throw new TimeoutException();
770         }
771         return p;
772     }
773 
774     /**
775      * Forces this phaser to enter termination state.  Counts of
776      * registered parties are unaffected.  If this phaser is a member
777      * of a tiered set of phasers, then all of the phasers in the set
778      * are terminated.  If this phaser is already terminated, this
779      * method has no effect.  This method may be useful for
780      * coordinating recovery after one or more tasks encounter
781      * unexpected exceptions.
782      */
forceTermination()783     public void forceTermination() {
784         // Only need to change root state
785         final Phaser root = this.root;
786         long s;
787         while ((s = root.state) >= 0) {
788             if (UNSAFE.compareAndSwapLong(root, stateOffset,
789                                           s, s | TERMINATION_BIT)) {
790                 // signal all threads
791                 releaseWaiters(0); // Waiters on evenQ
792                 releaseWaiters(1); // Waiters on oddQ
793                 return;
794             }
795         }
796     }
797 
798     /**
799      * Returns the current phase number. The maximum phase number is
800      * {@code Integer.MAX_VALUE}, after which it restarts at
801      * zero. Upon termination, the phase number is negative,
802      * in which case the prevailing phase prior to termination
803      * may be obtained via {@code getPhase() + Integer.MIN_VALUE}.
804      *
805      * @return the phase number, or a negative value if terminated
806      */
getPhase()807     public final int getPhase() {
808         return (int)(root.state >>> PHASE_SHIFT);
809     }
810 
811     /**
812      * Returns the number of parties registered at this phaser.
813      *
814      * @return the number of parties
815      */
getRegisteredParties()816     public int getRegisteredParties() {
817         return partiesOf(state);
818     }
819 
820     /**
821      * Returns the number of registered parties that have arrived at
822      * the current phase of this phaser. If this phaser has terminated,
823      * the returned value is meaningless and arbitrary.
824      *
825      * @return the number of arrived parties
826      */
getArrivedParties()827     public int getArrivedParties() {
828         return arrivedOf(reconcileState());
829     }
830 
831     /**
832      * Returns the number of registered parties that have not yet
833      * arrived at the current phase of this phaser. If this phaser has
834      * terminated, the returned value is meaningless and arbitrary.
835      *
836      * @return the number of unarrived parties
837      */
getUnarrivedParties()838     public int getUnarrivedParties() {
839         return unarrivedOf(reconcileState());
840     }
841 
842     /**
843      * Returns the parent of this phaser, or {@code null} if none.
844      *
845      * @return the parent of this phaser, or {@code null} if none
846      */
getParent()847     public Phaser getParent() {
848         return parent;
849     }
850 
851     /**
852      * Returns the root ancestor of this phaser, which is the same as
853      * this phaser if it has no parent.
854      *
855      * @return the root ancestor of this phaser
856      */
getRoot()857     public Phaser getRoot() {
858         return root;
859     }
860 
861     /**
862      * Returns {@code true} if this phaser has been terminated.
863      *
864      * @return {@code true} if this phaser has been terminated
865      */
isTerminated()866     public boolean isTerminated() {
867         return root.state < 0L;
868     }
869 
870     /**
871      * Overridable method to perform an action upon impending phase
872      * advance, and to control termination. This method is invoked
873      * upon arrival of the party advancing this phaser (when all other
874      * waiting parties are dormant).  If this method returns {@code
875      * true}, this phaser will be set to a final termination state
876      * upon advance, and subsequent calls to {@link #isTerminated}
877      * will return true. Any (unchecked) Exception or Error thrown by
878      * an invocation of this method is propagated to the party
879      * attempting to advance this phaser, in which case no advance
880      * occurs.
881      *
882      * <p>The arguments to this method provide the state of the phaser
883      * prevailing for the current transition.  The effects of invoking
884      * arrival, registration, and waiting methods on this phaser from
885      * within {@code onAdvance} are unspecified and should not be
886      * relied on.
887      *
888      * <p>If this phaser is a member of a tiered set of phasers, then
889      * {@code onAdvance} is invoked only for its root phaser on each
890      * advance.
891      *
892      * <p>To support the most common use cases, the default
893      * implementation of this method returns {@code true} when the
894      * number of registered parties has become zero as the result of a
895      * party invoking {@code arriveAndDeregister}.  You can disable
896      * this behavior, thus enabling continuation upon future
897      * registrations, by overriding this method to always return
898      * {@code false}:
899      *
900      * <pre> {@code
901      * Phaser phaser = new Phaser() {
902      *   protected boolean onAdvance(int phase, int parties) { return false; }
903      * }}</pre>
904      *
905      * @param phase the current phase number on entry to this method,
906      * before this phaser is advanced
907      * @param registeredParties the current number of registered parties
908      * @return {@code true} if this phaser should terminate
909      */
onAdvance(int phase, int registeredParties)910     protected boolean onAdvance(int phase, int registeredParties) {
911         return registeredParties == 0;
912     }
913 
914     /**
915      * Returns a string identifying this phaser, as well as its
916      * state.  The state, in brackets, includes the String {@code
917      * "phase = "} followed by the phase number, {@code "parties = "}
918      * followed by the number of registered parties, and {@code
919      * "arrived = "} followed by the number of arrived parties.
920      *
921      * @return a string identifying this phaser, as well as its state
922      */
toString()923     public String toString() {
924         return stateToString(reconcileState());
925     }
926 
927     /**
928      * Implementation of toString and string-based error messages
929      */
stateToString(long s)930     private String stateToString(long s) {
931         return super.toString() +
932             "[phase = " + phaseOf(s) +
933             " parties = " + partiesOf(s) +
934             " arrived = " + arrivedOf(s) + "]";
935     }
936 
937     // Waiting mechanics
938 
939     /**
940      * Removes and signals threads from queue for phase.
941      */
releaseWaiters(int phase)942     private void releaseWaiters(int phase) {
943         QNode q;   // first element of queue
944         Thread t;  // its thread
945         AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
946         while ((q = head.get()) != null &&
947                q.phase != (int)(root.state >>> PHASE_SHIFT)) {
948             if (head.compareAndSet(q, q.next) &&
949                 (t = q.thread) != null) {
950                 q.thread = null;
951                 LockSupport.unpark(t);
952             }
953         }
954     }
955 
956     /**
957      * Variant of releaseWaiters that additionally tries to remove any
958      * nodes no longer waiting for advance due to timeout or
959      * interrupt. Currently, nodes are removed only if they are at
960      * head of queue, which suffices to reduce memory footprint in
961      * most usages.
962      *
963      * @return current phase on exit
964      */
abortWait(int phase)965     private int abortWait(int phase) {
966         AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
967         for (;;) {
968             Thread t;
969             QNode q = head.get();
970             int p = (int)(root.state >>> PHASE_SHIFT);
971             if (q == null || ((t = q.thread) != null && q.phase == p))
972                 return p;
973             if (head.compareAndSet(q, q.next) && t != null) {
974                 q.thread = null;
975                 LockSupport.unpark(t);
976             }
977         }
978     }
979 
980     /** The number of CPUs, for spin control */
981     private static final int NCPU = Runtime.getRuntime().availableProcessors();
982 
983     /**
984      * The number of times to spin before blocking while waiting for
985      * advance, per arrival while waiting. On multiprocessors, fully
986      * blocking and waking up a large number of threads all at once is
987      * usually a very slow process, so we use rechargeable spins to
988      * avoid it when threads regularly arrive: When a thread in
989      * internalAwaitAdvance notices another arrival before blocking,
990      * and there appear to be enough CPUs available, it spins
991      * SPINS_PER_ARRIVAL more times before blocking. The value trades
992      * off good-citizenship vs big unnecessary slowdowns.
993      */
994     static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
995 
996     /**
997      * Possibly blocks and waits for phase to advance unless aborted.
998      * Call only on root phaser.
999      *
1000      * @param phase current phase
1001      * @param node if non-null, the wait node to track interrupt and timeout;
1002      * if null, denotes noninterruptible wait
1003      * @return current phase
1004      */
internalAwaitAdvance(int phase, QNode node)1005     private int internalAwaitAdvance(int phase, QNode node) {
1006         // assert root == this;
1007         releaseWaiters(phase-1);          // ensure old queue clean
1008         boolean queued = false;           // true when node is enqueued
1009         int lastUnarrived = 0;            // to increase spins upon change
1010         int spins = SPINS_PER_ARRIVAL;
1011         long s;
1012         int p;
1013         while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
1014             if (node == null) {           // spinning in noninterruptible mode
1015                 int unarrived = (int)s & UNARRIVED_MASK;
1016                 if (unarrived != lastUnarrived &&
1017                     (lastUnarrived = unarrived) < NCPU)
1018                     spins += SPINS_PER_ARRIVAL;
1019                 boolean interrupted = Thread.interrupted();
1020                 if (interrupted || --spins < 0) { // need node to record intr
1021                     node = new QNode(this, phase, false, false, 0L);
1022                     node.wasInterrupted = interrupted;
1023                 }
1024             }
1025             else if (node.isReleasable()) // done or aborted
1026                 break;
1027             else if (!queued) {           // push onto queue
1028                 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
1029                 QNode q = node.next = head.get();
1030                 if ((q == null || q.phase == phase) &&
1031                     (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
1032                     queued = head.compareAndSet(q, node);
1033             }
1034             else {
1035                 try {
1036                     ForkJoinPool.managedBlock(node);
1037                 } catch (InterruptedException ie) {
1038                     node.wasInterrupted = true;
1039                 }
1040             }
1041         }
1042 
1043         if (node != null) {
1044             if (node.thread != null)
1045                 node.thread = null;       // avoid need for unpark()
1046             if (node.wasInterrupted && !node.interruptible)
1047                 Thread.currentThread().interrupt();
1048             if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
1049                 return abortWait(phase); // possibly clean up on abort
1050         }
1051         releaseWaiters(phase);
1052         return p;
1053     }
1054 
1055     /**
1056      * Wait nodes for Treiber stack representing wait queue
1057      */
1058     static final class QNode implements ForkJoinPool.ManagedBlocker {
1059         final Phaser phaser;
1060         final int phase;
1061         final boolean interruptible;
1062         final boolean timed;
1063         boolean wasInterrupted;
1064         long nanos;
1065         final long deadline;
1066         volatile Thread thread; // nulled to cancel wait
1067         QNode next;
1068 
QNode(Phaser phaser, int phase, boolean interruptible, boolean timed, long nanos)1069         QNode(Phaser phaser, int phase, boolean interruptible,
1070               boolean timed, long nanos) {
1071             this.phaser = phaser;
1072             this.phase = phase;
1073             this.interruptible = interruptible;
1074             this.nanos = nanos;
1075             this.timed = timed;
1076             this.deadline = timed ? System.nanoTime() + nanos : 0L;
1077             thread = Thread.currentThread();
1078         }
1079 
isReleasable()1080         public boolean isReleasable() {
1081             if (thread == null)
1082                 return true;
1083             if (phaser.getPhase() != phase) {
1084                 thread = null;
1085                 return true;
1086             }
1087             if (Thread.interrupted())
1088                 wasInterrupted = true;
1089             if (wasInterrupted && interruptible) {
1090                 thread = null;
1091                 return true;
1092             }
1093             if (timed) {
1094                 if (nanos > 0L) {
1095                     nanos = deadline - System.nanoTime();
1096                 }
1097                 if (nanos <= 0L) {
1098                     thread = null;
1099                     return true;
1100                 }
1101             }
1102             return false;
1103         }
1104 
block()1105         public boolean block() {
1106             if (isReleasable())
1107                 return true;
1108             else if (!timed)
1109                 LockSupport.park(this);
1110             else if (nanos > 0L)
1111                 LockSupport.parkNanos(this, nanos);
1112             return isReleasable();
1113         }
1114     }
1115 
1116     // Unsafe mechanics
1117 
1118     private static final sun.misc.Unsafe UNSAFE;
1119     private static final long stateOffset;
1120     static {
1121         try {
1122             UNSAFE = sun.misc.Unsafe.getUnsafe();
1123             Class<?> k = Phaser.class;
1124             stateOffset = UNSAFE.objectFieldOffset
1125                 (k.getDeclaredField("state"));
1126         } catch (Exception e) {
1127             throw new Error(e);
1128         }
1129 
1130         // Reduce the risk of rare disastrous classloading in first call to
1131         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1132         Class<?> ensureLoaded = LockSupport.class;
1133     }
1134 }
1135