• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent.atomic;
8 
9 import java.util.Arrays;
10 import java.util.concurrent.ThreadLocalRandom;
11 import java.util.function.DoubleBinaryOperator;
12 import java.util.function.LongBinaryOperator;
13 
14 /**
15  * A package-local class holding common representation and mechanics
16  * for classes supporting dynamic striping on 64bit values. The class
17  * extends Number so that concrete subclasses must publicly do so.
18  */
19 @SuppressWarnings("serial")
20 abstract class Striped64 extends Number {
21     /*
22      * This class maintains a lazily-initialized table of atomically
23      * updated variables, plus an extra "base" field. The table size
24      * is a power of two. Indexing uses masked per-thread hash codes.
25      * Nearly all declarations in this class are package-private,
26      * accessed directly by subclasses.
27      *
28      * Table entries are of class Cell; a variant of AtomicLong padded
29      * (via @Contended) to reduce cache contention. Padding is
30      * overkill for most Atomics because they are usually irregularly
31      * scattered in memory and thus don't interfere much with each
32      * other. But Atomic objects residing in arrays will tend to be
33      * placed adjacent to each other, and so will most often share
34      * cache lines (with a huge negative performance impact) without
35      * this precaution.
36      *
37      * In part because Cells are relatively large, we avoid creating
38      * them until they are needed.  When there is no contention, all
39      * updates are made to the base field.  Upon first contention (a
40      * failed CAS on base update), the table is initialized to size 2.
41      * The table size is doubled upon further contention until
42      * reaching the nearest power of two greater than or equal to the
43      * number of CPUS. Table slots remain empty (null) until they are
44      * needed.
45      *
46      * A single spinlock ("cellsBusy") is used for initializing and
47      * resizing the table, as well as populating slots with new Cells.
48      * There is no need for a blocking lock; when the lock is not
49      * available, threads try other slots (or the base).  During these
50      * retries, there is increased contention and reduced locality,
51      * which is still better than alternatives.
52      *
53      * The Thread probe fields maintained via ThreadLocalRandom serve
54      * as per-thread hash codes. We let them remain uninitialized as
55      * zero (if they come in this way) until they contend at slot
56      * 0. They are then initialized to values that typically do not
57      * often conflict with others.  Contention and/or table collisions
58      * are indicated by failed CASes when performing an update
59      * operation. Upon a collision, if the table size is less than
60      * the capacity, it is doubled in size unless some other thread
61      * holds the lock. If a hashed slot is empty, and lock is
62      * available, a new Cell is created. Otherwise, if the slot
63      * exists, a CAS is tried.  Retries proceed by "double hashing",
64      * using a secondary hash (Marsaglia XorShift) to try to find a
65      * free slot.
66      *
67      * The table size is capped because, when there are more threads
68      * than CPUs, supposing that each thread were bound to a CPU,
69      * there would exist a perfect hash function mapping threads to
70      * slots that eliminates collisions. When we reach capacity, we
71      * search for this mapping by randomly varying the hash codes of
72      * colliding threads.  Because search is random, and collisions
73      * only become known via CAS failures, convergence can be slow,
74      * and because threads are typically not bound to CPUS forever,
75      * may not occur at all. However, despite these limitations,
76      * observed contention rates are typically low in these cases.
77      *
78      * It is possible for a Cell to become unused when threads that
79      * once hashed to it terminate, as well as in the case where
80      * doubling the table causes no thread to hash to it under
81      * expanded mask.  We do not try to detect or remove such cells,
82      * under the assumption that for long-running instances, observed
83      * contention levels will recur, so the cells will eventually be
84      * needed again; and for short-lived ones, it does not matter.
85      */
86 
87     /**
88      * Padded variant of AtomicLong supporting only raw accesses plus CAS.
89      *
90      * JVM intrinsics note: It would be possible to use a release-only
91      * form of CAS here, if it were provided.
92      */
93     // @jdk.internal.vm.annotation.Contended // android-removed
94     static final class Cell {
95         volatile long value;
Cell(long x)96         Cell(long x) { value = x; }
cas(long cmp, long val)97         final boolean cas(long cmp, long val) {
98             return U.compareAndSwapLong(this, VALUE, cmp, val);
99         }
reset()100         final void reset() {
101             U.putLongVolatile(this, VALUE, 0L);
102         }
reset(long identity)103         final void reset(long identity) {
104             U.putLongVolatile(this, VALUE, identity);
105         }
106 
107         // Unsafe mechanics
108         private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
109         private static final long VALUE;
110         static {
111             try {
112                 VALUE = U.objectFieldOffset
113                     (Cell.class.getDeclaredField("value"));
114             } catch (ReflectiveOperationException e) {
115                 throw new Error(e);
116             }
117         }
118     }
119 
120     /** Number of CPUS, to place bound on table size */
121     static final int NCPU = Runtime.getRuntime().availableProcessors();
122 
123     /**
124      * Table of cells. When non-null, size is a power of 2.
125      */
126     transient volatile Cell[] cells;
127 
128     /**
129      * Base value, used mainly when there is no contention, but also as
130      * a fallback during table initialization races. Updated via CAS.
131      */
132     transient volatile long base;
133 
134     /**
135      * Spinlock (locked via CAS) used when resizing and/or creating Cells.
136      */
137     transient volatile int cellsBusy;
138 
139     /**
140      * Package-private default constructor.
141      */
Striped64()142     Striped64() {
143     }
144 
145     /**
146      * CASes the base field.
147      */
casBase(long cmp, long val)148     final boolean casBase(long cmp, long val) {
149         return U.compareAndSwapLong(this, BASE, cmp, val);
150     }
151 
152     /**
153      * CASes the cellsBusy field from 0 to 1 to acquire lock.
154      */
casCellsBusy()155     final boolean casCellsBusy() {
156         return U.compareAndSwapInt(this, CELLSBUSY, 0, 1);
157     }
158 
159     /**
160      * Returns the probe value for the current thread.
161      * Duplicated from ThreadLocalRandom because of packaging restrictions.
162      */
getProbe()163     static final int getProbe() {
164         return U.getInt(Thread.currentThread(), PROBE);
165     }
166 
167     /**
168      * Pseudo-randomly advances and records the given probe value for the
169      * given thread.
170      * Duplicated from ThreadLocalRandom because of packaging restrictions.
171      */
advanceProbe(int probe)172     static final int advanceProbe(int probe) {
173         probe ^= probe << 13;   // xorshift
174         probe ^= probe >>> 17;
175         probe ^= probe << 5;
176         U.putInt(Thread.currentThread(), PROBE, probe);
177         return probe;
178     }
179 
180     /**
181      * Handles cases of updates involving initialization, resizing,
182      * creating new Cells, and/or contention. See above for
183      * explanation. This method suffers the usual non-modularity
184      * problems of optimistic retry code, relying on rechecked sets of
185      * reads.
186      *
187      * @param x the value
188      * @param fn the update function, or null for add (this convention
189      * avoids the need for an extra field or function in LongAdder).
190      * @param wasUncontended false if CAS failed before call
191      */
longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)192     final void longAccumulate(long x, LongBinaryOperator fn,
193                               boolean wasUncontended) {
194         int h;
195         if ((h = getProbe()) == 0) {
196             ThreadLocalRandom.current(); // force initialization
197             h = getProbe();
198             wasUncontended = true;
199         }
200         boolean collide = false;                // True if last slot nonempty
201         done: for (;;) {
202             Cell[] as; Cell a; int n; long v;
203             if ((as = cells) != null && (n = as.length) > 0) {
204                 if ((a = as[(n - 1) & h]) == null) {
205                     if (cellsBusy == 0) {       // Try to attach new Cell
206                         Cell r = new Cell(x);   // Optimistically create
207                         if (cellsBusy == 0 && casCellsBusy()) {
208                             try {               // Recheck under lock
209                                 Cell[] rs; int m, j;
210                                 if ((rs = cells) != null &&
211                                     (m = rs.length) > 0 &&
212                                     rs[j = (m - 1) & h] == null) {
213                                     rs[j] = r;
214                                     break done;
215                                 }
216                             } finally {
217                                 cellsBusy = 0;
218                             }
219                             continue;           // Slot is now non-empty
220                         }
221                     }
222                     collide = false;
223                 }
224                 else if (!wasUncontended)       // CAS already known to fail
225                     wasUncontended = true;      // Continue after rehash
226                 else if (a.cas(v = a.value,
227                                (fn == null) ? v + x : fn.applyAsLong(v, x)))
228                     break;
229                 else if (n >= NCPU || cells != as)
230                     collide = false;            // At max size or stale
231                 else if (!collide)
232                     collide = true;
233                 else if (cellsBusy == 0 && casCellsBusy()) {
234                     try {
235                         if (cells == as)        // Expand table unless stale
236                             cells = Arrays.copyOf(as, n << 1);
237                     } finally {
238                         cellsBusy = 0;
239                     }
240                     collide = false;
241                     continue;                   // Retry with expanded table
242                 }
243                 h = advanceProbe(h);
244             }
245             else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
246                 try {                           // Initialize table
247                     if (cells == as) {
248                         Cell[] rs = new Cell[2];
249                         rs[h & 1] = new Cell(x);
250                         cells = rs;
251                         break done;
252                     }
253                 } finally {
254                     cellsBusy = 0;
255                 }
256             }
257             // Fall back on using base
258             else if (casBase(v = base,
259                              (fn == null) ? v + x : fn.applyAsLong(v, x)))
260                 break done;
261         }
262     }
263 
apply(DoubleBinaryOperator fn, long v, double x)264     private static long apply(DoubleBinaryOperator fn, long v, double x) {
265         double d = Double.longBitsToDouble(v);
266         d = (fn == null) ? d + x : fn.applyAsDouble(d, x);
267         return Double.doubleToRawLongBits(d);
268     }
269 
270     /**
271      * Same as longAccumulate, but injecting long/double conversions
272      * in too many places to sensibly merge with long version, given
273      * the low-overhead requirements of this class. So must instead be
274      * maintained by copy/paste/adapt.
275      */
doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended)276     final void doubleAccumulate(double x, DoubleBinaryOperator fn,
277                                 boolean wasUncontended) {
278         int h;
279         if ((h = getProbe()) == 0) {
280             ThreadLocalRandom.current(); // force initialization
281             h = getProbe();
282             wasUncontended = true;
283         }
284         boolean collide = false;                // True if last slot nonempty
285         done: for (;;) {
286             Cell[] as; Cell a; int n; long v;
287             if ((as = cells) != null && (n = as.length) > 0) {
288                 if ((a = as[(n - 1) & h]) == null) {
289                     if (cellsBusy == 0) {       // Try to attach new Cell
290                         Cell r = new Cell(Double.doubleToRawLongBits(x));
291                         if (cellsBusy == 0 && casCellsBusy()) {
292                             try {               // Recheck under lock
293                                 Cell[] rs; int m, j;
294                                 if ((rs = cells) != null &&
295                                     (m = rs.length) > 0 &&
296                                     rs[j = (m - 1) & h] == null) {
297                                     rs[j] = r;
298                                     break done;
299                                 }
300                             } finally {
301                                 cellsBusy = 0;
302                             }
303                             continue;           // Slot is now non-empty
304                         }
305                     }
306                     collide = false;
307                 }
308                 else if (!wasUncontended)       // CAS already known to fail
309                     wasUncontended = true;      // Continue after rehash
310                 else if (a.cas(v = a.value, apply(fn, v, x)))
311                     break;
312                 else if (n >= NCPU || cells != as)
313                     collide = false;            // At max size or stale
314                 else if (!collide)
315                     collide = true;
316                 else if (cellsBusy == 0 && casCellsBusy()) {
317                     try {
318                         if (cells == as)        // Expand table unless stale
319                             cells = Arrays.copyOf(as, n << 1);
320                     } finally {
321                         cellsBusy = 0;
322                     }
323                     collide = false;
324                     continue;                   // Retry with expanded table
325                 }
326                 h = advanceProbe(h);
327             }
328             else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
329                 try {                           // Initialize table
330                     if (cells == as) {
331                         Cell[] rs = new Cell[2];
332                         rs[h & 1] = new Cell(Double.doubleToRawLongBits(x));
333                         cells = rs;
334                         break done;
335                     }
336                 } finally {
337                     cellsBusy = 0;
338                 }
339             }
340             // Fall back on using base
341             else if (casBase(v = base, apply(fn, v, x)))
342                 break done;
343         }
344     }
345 
346     // Unsafe mechanics
347     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
348     private static final long BASE;
349     private static final long CELLSBUSY;
350     private static final long PROBE;
351     static {
352         try {
353             BASE = U.objectFieldOffset
354                 (Striped64.class.getDeclaredField("base"));
355             CELLSBUSY = U.objectFieldOffset
356                 (Striped64.class.getDeclaredField("cellsBusy"));
357 
358             PROBE = U.objectFieldOffset
359                 (Thread.class.getDeclaredField("threadLocalRandomProbe"));
360         } catch (ReflectiveOperationException e) {
361             throw new Error(e);
362         }
363     }
364 
365 }
366