• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea, Bill Scherer, and Michael Scott with
32  * assistance from members of JCP JSR-166 Expert Group and released to
33  * the public domain, as explained at
34  * http://creativecommons.org/publicdomain/zero/1.0/
35  */
36 
37 package java.util.concurrent;
38 
39 /**
40  * A synchronization point at which threads can pair and swap elements
41  * within pairs.  Each thread presents some object on entry to the
42  * {@link #exchange exchange} method, matches with a partner thread,
43  * and receives its partner's object on return.  An Exchanger may be
44  * viewed as a bidirectional form of a {@link SynchronousQueue}.
45  * Exchangers may be useful in applications such as genetic algorithms
46  * and pipeline designs.
47  *
48  * <p><b>Sample Usage:</b>
49  * Here are the highlights of a class that uses an {@code Exchanger}
50  * to swap buffers between threads so that the thread filling the
51  * buffer gets a freshly emptied one when it needs it, handing off the
52  * filled one to the thread emptying the buffer.
53  * <pre> {@code
54  * class FillAndEmpty {
55  *   Exchanger<DataBuffer> exchanger = new Exchanger<>();
56  *   DataBuffer initialEmptyBuffer = ... a made-up type
57  *   DataBuffer initialFullBuffer = ...
58  *
59  *   class FillingLoop implements Runnable {
60  *     public void run() {
61  *       DataBuffer currentBuffer = initialEmptyBuffer;
62  *       try {
63  *         while (currentBuffer != null) {
64  *           addToBuffer(currentBuffer);
65  *           if (currentBuffer.isFull())
66  *             currentBuffer = exchanger.exchange(currentBuffer);
67  *         }
68  *       } catch (InterruptedException ex) { ... handle ... }
69  *     }
70  *   }
71  *
72  *   class EmptyingLoop implements Runnable {
73  *     public void run() {
74  *       DataBuffer currentBuffer = initialFullBuffer;
75  *       try {
76  *         while (currentBuffer != null) {
77  *           takeFromBuffer(currentBuffer);
78  *           if (currentBuffer.isEmpty())
79  *             currentBuffer = exchanger.exchange(currentBuffer);
80  *         }
81  *       } catch (InterruptedException ex) { ... handle ...}
82  *     }
83  *   }
84  *
85  *   void start() {
86  *     new Thread(new FillingLoop()).start();
87  *     new Thread(new EmptyingLoop()).start();
88  *   }
89  * }}</pre>
90  *
91  * <p>Memory consistency effects: For each pair of threads that
92  * successfully exchange objects via an {@code Exchanger}, actions
93  * prior to the {@code exchange()} in each thread
94  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
95  * those subsequent to a return from the corresponding {@code exchange()}
96  * in the other thread.
97  *
98  * @since 1.5
99  * @author Doug Lea and Bill Scherer and Michael Scott
100  * @param <V> The type of objects that may be exchanged
101  */
102 public class Exchanger<V> {
103 
104     /*
105      * Overview: The core algorithm is, for an exchange "slot",
106      * and a participant (caller) with an item:
107      *
108      * for (;;) {
109      *   if (slot is empty) {                       // offer
110      *     place item in a Node;
111      *     if (can CAS slot from empty to node) {
112      *       wait for release;
113      *       return matching item in node;
114      *     }
115      *   }
116      *   else if (can CAS slot from node to empty) { // release
117      *     get the item in node;
118      *     set matching item in node;
119      *     release waiting thread;
120      *   }
121      *   // else retry on CAS failure
122      * }
123      *
124      * This is among the simplest forms of a "dual data structure" --
125      * see Scott and Scherer's DISC 04 paper and
126      * http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html
127      *
128      * This works great in principle. But in practice, like many
129      * algorithms centered on atomic updates to a single location, it
130      * scales horribly when there are more than a few participants
131      * using the same Exchanger. So the implementation instead uses a
132      * form of elimination arena, that spreads out this contention by
133      * arranging that some threads typically use different slots,
134      * while still ensuring that eventually, any two parties will be
135      * able to exchange items. That is, we cannot completely partition
136      * across threads, but instead give threads arena indices that
137      * will on average grow under contention and shrink under lack of
138      * contention. We approach this by defining the Nodes that we need
139      * anyway as ThreadLocals, and include in them per-thread index
140      * and related bookkeeping state. (We can safely reuse per-thread
141      * nodes rather than creating them fresh each time because slots
142      * alternate between pointing to a node vs null, so cannot
143      * encounter ABA problems. However, we do need some care in
144      * resetting them between uses.)
145      *
146      * Implementing an effective arena requires allocating a bunch of
147      * space, so we only do so upon detecting contention (except on
148      * uniprocessors, where they wouldn't help, so aren't used).
149      * Otherwise, exchanges use the single-slot slotExchange method.
150      * On contention, not only must the slots be in different
151      * locations, but the locations must not encounter memory
152      * contention due to being on the same cache line (or more
153      * generally, the same coherence unit).  Because, as of this
154      * writing, there is no way to determine cacheline size, we define
155      * a value that is enough for common platforms.  Additionally,
156      * extra care elsewhere is taken to avoid other false/unintended
157      * sharing and to enhance locality, including adding padding (via
158      * @Contended) to Nodes, embedding "bound" as an Exchanger field,
159      * and reworking some park/unpark mechanics compared to
160      * LockSupport versions.
161      *
162      * The arena starts out with only one used slot. We expand the
163      * effective arena size by tracking collisions; i.e., failed CASes
164      * while trying to exchange. By nature of the above algorithm, the
165      * only kinds of collision that reliably indicate contention are
166      * when two attempted releases collide -- one of two attempted
167      * offers can legitimately fail to CAS without indicating
168      * contention by more than one other thread. (Note: it is possible
169      * but not worthwhile to more precisely detect contention by
170      * reading slot values after CAS failures.)  When a thread has
171      * collided at each slot within the current arena bound, it tries
172      * to expand the arena size by one. We track collisions within
173      * bounds by using a version (sequence) number on the "bound"
174      * field, and conservatively reset collision counts when a
175      * participant notices that bound has been updated (in either
176      * direction).
177      *
178      * The effective arena size is reduced (when there is more than
179      * one slot) by giving up on waiting after a while and trying to
180      * decrement the arena size on expiration. The value of "a while"
181      * is an empirical matter.  We implement by piggybacking on the
182      * use of spin->yield->block that is essential for reasonable
183      * waiting performance anyway -- in a busy exchanger, offers are
184      * usually almost immediately released, in which case context
185      * switching on multiprocessors is extremely slow/wasteful.  Arena
186      * waits just omit the blocking part, and instead cancel. The spin
187      * count is empirically chosen to be a value that avoids blocking
188      * 99% of the time under maximum sustained exchange rates on a
189      * range of test machines. Spins and yields entail some limited
190      * randomness (using a cheap xorshift) to avoid regular patterns
191      * that can induce unproductive grow/shrink cycles. (Using a
192      * pseudorandom also helps regularize spin cycle duration by
193      * making branches unpredictable.)  Also, during an offer, a
194      * waiter can "know" that it will be released when its slot has
195      * changed, but cannot yet proceed until match is set.  In the
196      * mean time it cannot cancel the offer, so instead spins/yields.
197      * Note: It is possible to avoid this secondary check by changing
198      * the linearization point to be a CAS of the match field (as done
199      * in one case in the Scott & Scherer DISC paper), which also
200      * increases asynchrony a bit, at the expense of poorer collision
201      * detection and inability to always reuse per-thread nodes. So
202      * the current scheme is typically a better tradeoff.
203      *
204      * On collisions, indices traverse the arena cyclically in reverse
205      * order, restarting at the maximum index (which will tend to be
206      * sparsest) when bounds change. (On expirations, indices instead
207      * are halved until reaching 0.) It is possible (and has been
208      * tried) to use randomized, prime-value-stepped, or double-hash
209      * style traversal instead of simple cyclic traversal to reduce
210      * bunching.  But empirically, whatever benefits these may have
211      * don't overcome their added overhead: We are managing operations
212      * that occur very quickly unless there is sustained contention,
213      * so simpler/faster control policies work better than more
214      * accurate but slower ones.
215      *
216      * Because we use expiration for arena size control, we cannot
217      * throw TimeoutExceptions in the timed version of the public
218      * exchange method until the arena size has shrunken to zero (or
219      * the arena isn't enabled). This may delay response to timeout
220      * but is still within spec.
221      *
222      * Essentially all of the implementation is in methods
223      * slotExchange and arenaExchange. These have similar overall
224      * structure, but differ in too many details to combine. The
225      * slotExchange method uses the single Exchanger field "slot"
226      * rather than arena array elements. However, it still needs
227      * minimal collision detection to trigger arena construction.
228      * (The messiest part is making sure interrupt status and
229      * InterruptedExceptions come out right during transitions when
230      * both methods may be called. This is done by using null return
231      * as a sentinel to recheck interrupt status.)
232      *
233      * As is too common in this sort of code, methods are monolithic
234      * because most of the logic relies on reads of fields that are
235      * maintained as local variables so can't be nicely factored --
236      * mainly, here, bulky spin->yield->block/cancel code), and
237      * heavily dependent on intrinsics (Unsafe) to use inlined
238      * embedded CAS and related memory access operations (that tend
239      * not to be as readily inlined by dynamic compilers when they are
240      * hidden behind other methods that would more nicely name and
241      * encapsulate the intended effects). This includes the use of
242      * putOrderedX to clear fields of the per-thread Nodes between
243      * uses. Note that field Node.item is not declared as volatile
244      * even though it is read by releasing threads, because they only
245      * do so after CAS operations that must precede access, and all
246      * uses by the owning thread are otherwise acceptably ordered by
247      * other operations. (Because the actual points of atomicity are
248      * slot CASes, it would also be legal for the write to Node.match
249      * in a release to be weaker than a full volatile write. However,
250      * this is not done because it could allow further postponement of
251      * the write, delaying progress.)
252      */
253 
254     /**
255      * The byte distance (as a shift value) between any two used slots
256      * in the arena.  1 << ASHIFT should be at least cacheline size.
257      */
258     private static final int ASHIFT = 7;
259 
260     /**
261      * The maximum supported arena index. The maximum allocatable
262      * arena size is MMASK + 1. Must be a power of two minus one, less
263      * than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices
264      * for the expected scaling limits of the main algorithms.
265      */
266     private static final int MMASK = 0xff;
267 
268     /**
269      * Unit for sequence/version bits of bound field. Each successful
270      * change to the bound also adds SEQ.
271      */
272     private static final int SEQ = MMASK + 1;
273 
274     /** The number of CPUs, for sizing and spin control */
275     private static final int NCPU = Runtime.getRuntime().availableProcessors();
276 
277     /**
278      * The maximum slot index of the arena: The number of slots that
279      * can in principle hold all threads without contention, or at
280      * most the maximum indexable value.
281      */
282     static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
283 
284     /**
285      * The bound for spins while waiting for a match. The actual
286      * number of iterations will on average be about twice this value
287      * due to randomization. Note: Spinning is disabled when NCPU==1.
288      */
289     private static final int SPINS = 1 << 10;
290 
291     /**
292      * Value representing null arguments/returns from public
293      * methods. Needed because the API originally didn't disallow null
294      * arguments, which it should have.
295      */
296     private static final Object NULL_ITEM = new Object();
297 
298     /**
299      * Sentinel value returned by internal exchange methods upon
300      * timeout, to avoid need for separate timed versions of these
301      * methods.
302      */
303     private static final Object TIMED_OUT = new Object();
304 
305     /**
306      * Nodes hold partially exchanged data, plus other per-thread
307      * bookkeeping. Padded via @Contended to reduce memory contention.
308      */
309     // Android-removed: @Contended, this hint is not used by the Android runtime.
310     //@jdk.internal.vm.annotation.Contended
311     static final class Node {
312         int index;              // Arena index
313         int bound;              // Last recorded value of Exchanger.bound
314         int collides;           // Number of CAS failures at current bound
315         int hash;               // Pseudo-random for spins
316         Object item;            // This thread's current item
317         volatile Object match;  // Item provided by releasing thread
318         volatile Thread parked; // Set to this thread when parked, else null
319     }
320 
321     /** The corresponding thread local class */
322     static final class Participant extends ThreadLocal<Node> {
initialValue()323         public Node initialValue() { return new Node(); }
324     }
325 
326     /**
327      * Per-thread state.
328      */
329     private final Participant participant;
330 
331     /**
332      * Elimination array; null until enabled (within slotExchange).
333      * Element accesses use emulation of volatile gets and CAS.
334      */
335     private volatile Node[] arena;
336 
337     /**
338      * Slot used until contention detected.
339      */
340     private volatile Node slot;
341 
342     /**
343      * The index of the largest valid arena position, OR'ed with SEQ
344      * number in high bits, incremented on each update.  The initial
345      * update from 0 to SEQ is used to ensure that the arena array is
346      * constructed only once.
347      */
348     private volatile int bound;
349 
350     /**
351      * Exchange function when arenas enabled. See above for explanation.
352      *
353      * @param item the (non-null) item to exchange
354      * @param timed true if the wait is timed
355      * @param ns if timed, the maximum wait time, else 0L
356      * @return the other thread's item; or null if interrupted; or
357      * TIMED_OUT if timed and timed out
358      */
arenaExchange(Object item, boolean timed, long ns)359     private final Object arenaExchange(Object item, boolean timed, long ns) {
360         Node[] a = arena;
361         Node p = participant.get();
362         for (int i = p.index;;) {                      // access slot at i
363             int b, m, c; long j;                       // j is raw array offset
364             Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
365             if (q != null && U.compareAndSwapObject(a, j, q, null)) {
366                 Object v = q.item;                     // release
367                 q.match = item;
368                 Thread w = q.parked;
369                 if (w != null)
370                     U.unpark(w);
371                 return v;
372             }
373             else if (i <= (m = (b = bound) & MMASK) && q == null) {
374                 p.item = item;                         // offer
375                 if (U.compareAndSwapObject(a, j, null, p)) {
376                     long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
377                     Thread t = Thread.currentThread(); // wait
378                     for (int h = p.hash, spins = SPINS;;) {
379                         Object v = p.match;
380                         if (v != null) {
381                             U.putOrderedObject(p, MATCH, null);
382                             p.item = null;             // clear for next use
383                             p.hash = h;
384                             return v;
385                         }
386                         else if (spins > 0) {
387                             h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
388                             if (h == 0)                // initialize hash
389                                 h = SPINS | (int)t.getId();
390                             else if (h < 0 &&          // approx 50% true
391                                      (--spins & ((SPINS >>> 1) - 1)) == 0)
392                                 Thread.yield();        // two yields per wait
393                         }
394                         else if (U.getObjectVolatile(a, j) != p)
395                             spins = SPINS;       // releaser hasn't set match yet
396                         else if (!t.isInterrupted() && m == 0 &&
397                                  (!timed ||
398                                   (ns = end - System.nanoTime()) > 0L)) {
399                             U.putObject(t, BLOCKER, this); // emulate LockSupport
400                             p.parked = t;              // minimize window
401                             if (U.getObjectVolatile(a, j) == p)
402                                 U.park(false, ns);
403                             p.parked = null;
404                             U.putObject(t, BLOCKER, null);
405                         }
406                         else if (U.getObjectVolatile(a, j) == p &&
407                                  U.compareAndSwapObject(a, j, p, null)) {
408                             if (m != 0)                // try to shrink
409                                 U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
410                             p.item = null;
411                             p.hash = h;
412                             i = p.index >>>= 1;        // descend
413                             if (Thread.interrupted())
414                                 return null;
415                             if (timed && m == 0 && ns <= 0L)
416                                 return TIMED_OUT;
417                             break;                     // expired; restart
418                         }
419                     }
420                 }
421                 else
422                     p.item = null;                     // clear offer
423             }
424             else {
425                 if (p.bound != b) {                    // stale; reset
426                     p.bound = b;
427                     p.collides = 0;
428                     i = (i != m || m == 0) ? m : m - 1;
429                 }
430                 else if ((c = p.collides) < m || m == FULL ||
431                          !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
432                     p.collides = c + 1;
433                     i = (i == 0) ? m : i - 1;          // cyclically traverse
434                 }
435                 else
436                     i = m + 1;                         // grow
437                 p.index = i;
438             }
439         }
440     }
441 
442     /**
443      * Exchange function used until arenas enabled. See above for explanation.
444      *
445      * @param item the item to exchange
446      * @param timed true if the wait is timed
447      * @param ns if timed, the maximum wait time, else 0L
448      * @return the other thread's item; or null if either the arena
449      * was enabled or the thread was interrupted before completion; or
450      * TIMED_OUT if timed and timed out
451      */
slotExchange(Object item, boolean timed, long ns)452     private final Object slotExchange(Object item, boolean timed, long ns) {
453         Node p = participant.get();
454         Thread t = Thread.currentThread();
455         if (t.isInterrupted()) // preserve interrupt status so caller can recheck
456             return null;
457 
458         for (Node q;;) {
459             if ((q = slot) != null) {
460                 if (U.compareAndSwapObject(this, SLOT, q, null)) {
461                     Object v = q.item;
462                     q.match = item;
463                     Thread w = q.parked;
464                     if (w != null)
465                         U.unpark(w);
466                     return v;
467                 }
468                 // create arena on contention, but continue until slot null
469                 if (NCPU > 1 && bound == 0 &&
470                     U.compareAndSwapInt(this, BOUND, 0, SEQ))
471                     arena = new Node[(FULL + 2) << ASHIFT];
472             }
473             else if (arena != null)
474                 return null; // caller must reroute to arenaExchange
475             else {
476                 p.item = item;
477                 if (U.compareAndSwapObject(this, SLOT, null, p))
478                     break;
479                 p.item = null;
480             }
481         }
482 
483         // await release
484         int h = p.hash;
485         long end = timed ? System.nanoTime() + ns : 0L;
486         int spins = (NCPU > 1) ? SPINS : 1;
487         Object v;
488         while ((v = p.match) == null) {
489             if (spins > 0) {
490                 h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
491                 if (h == 0)
492                     h = SPINS | (int)t.getId();
493                 else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
494                     Thread.yield();
495             }
496             else if (slot != p)
497                 spins = SPINS;
498             else if (!t.isInterrupted() && arena == null &&
499                      (!timed || (ns = end - System.nanoTime()) > 0L)) {
500                 U.putObject(t, BLOCKER, this);
501                 p.parked = t;
502                 if (slot == p)
503                     U.park(false, ns);
504                 p.parked = null;
505                 U.putObject(t, BLOCKER, null);
506             }
507             else if (U.compareAndSwapObject(this, SLOT, p, null)) {
508                 v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
509                 break;
510             }
511         }
512         U.putOrderedObject(p, MATCH, null);
513         p.item = null;
514         p.hash = h;
515         return v;
516     }
517 
518     /**
519      * Creates a new Exchanger.
520      */
Exchanger()521     public Exchanger() {
522         participant = new Participant();
523     }
524 
525     /**
526      * Waits for another thread to arrive at this exchange point (unless
527      * the current thread is {@linkplain Thread#interrupt interrupted}),
528      * and then transfers the given object to it, receiving its object
529      * in return.
530      *
531      * <p>If another thread is already waiting at the exchange point then
532      * it is resumed for thread scheduling purposes and receives the object
533      * passed in by the current thread.  The current thread returns immediately,
534      * receiving the object passed to the exchange by that other thread.
535      *
536      * <p>If no other thread is already waiting at the exchange then the
537      * current thread is disabled for thread scheduling purposes and lies
538      * dormant until one of two things happens:
539      * <ul>
540      * <li>Some other thread enters the exchange; or
541      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
542      * the current thread.
543      * </ul>
544      * <p>If the current thread:
545      * <ul>
546      * <li>has its interrupted status set on entry to this method; or
547      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
548      * for the exchange,
549      * </ul>
550      * then {@link InterruptedException} is thrown and the current thread's
551      * interrupted status is cleared.
552      *
553      * @param x the object to exchange
554      * @return the object provided by the other thread
555      * @throws InterruptedException if the current thread was
556      *         interrupted while waiting
557      */
558     @SuppressWarnings("unchecked")
exchange(V x)559     public V exchange(V x) throws InterruptedException {
560         Object v;
561         Object item = (x == null) ? NULL_ITEM : x; // translate null args
562         if ((arena != null ||
563              (v = slotExchange(item, false, 0L)) == null) &&
564             ((Thread.interrupted() || // disambiguates null return
565               (v = arenaExchange(item, false, 0L)) == null)))
566             throw new InterruptedException();
567         return (v == NULL_ITEM) ? null : (V)v;
568     }
569 
570     /**
571      * Waits for another thread to arrive at this exchange point (unless
572      * the current thread is {@linkplain Thread#interrupt interrupted} or
573      * the specified waiting time elapses), and then transfers the given
574      * object to it, receiving its object in return.
575      *
576      * <p>If another thread is already waiting at the exchange point then
577      * it is resumed for thread scheduling purposes and receives the object
578      * passed in by the current thread.  The current thread returns immediately,
579      * receiving the object passed to the exchange by that other thread.
580      *
581      * <p>If no other thread is already waiting at the exchange then the
582      * current thread is disabled for thread scheduling purposes and lies
583      * dormant until one of three things happens:
584      * <ul>
585      * <li>Some other thread enters the exchange; or
586      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
587      * the current thread; or
588      * <li>The specified waiting time elapses.
589      * </ul>
590      * <p>If the current thread:
591      * <ul>
592      * <li>has its interrupted status set on entry to this method; or
593      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
594      * for the exchange,
595      * </ul>
596      * then {@link InterruptedException} is thrown and the current thread's
597      * interrupted status is cleared.
598      *
599      * <p>If the specified waiting time elapses then {@link
600      * TimeoutException} is thrown.  If the time is less than or equal
601      * to zero, the method will not wait at all.
602      *
603      * @param x the object to exchange
604      * @param timeout the maximum time to wait
605      * @param unit the time unit of the {@code timeout} argument
606      * @return the object provided by the other thread
607      * @throws InterruptedException if the current thread was
608      *         interrupted while waiting
609      * @throws TimeoutException if the specified waiting time elapses
610      *         before another thread enters the exchange
611      */
612     @SuppressWarnings("unchecked")
exchange(V x, long timeout, TimeUnit unit)613     public V exchange(V x, long timeout, TimeUnit unit)
614         throws InterruptedException, TimeoutException {
615         Object v;
616         Object item = (x == null) ? NULL_ITEM : x;
617         long ns = unit.toNanos(timeout);
618         if ((arena != null ||
619              (v = slotExchange(item, true, ns)) == null) &&
620             ((Thread.interrupted() ||
621               (v = arenaExchange(item, true, ns)) == null)))
622             throw new InterruptedException();
623         if (v == TIMED_OUT)
624             throw new TimeoutException();
625         return (v == NULL_ITEM) ? null : (V)v;
626     }
627 
628     // Unsafe mechanics
629     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
630     private static final long BOUND;
631     private static final long SLOT;
632     private static final long MATCH;
633     private static final long BLOCKER;
634     private static final int ABASE;
635     static {
636         try {
637             BOUND = U.objectFieldOffset
638                 (Exchanger.class.getDeclaredField("bound"));
639             SLOT = U.objectFieldOffset
640                 (Exchanger.class.getDeclaredField("slot"));
641 
642             MATCH = U.objectFieldOffset
643                 (Node.class.getDeclaredField("match"));
644 
645             BLOCKER = U.objectFieldOffset
646                 (Thread.class.getDeclaredField("parkBlocker"));
647 
648             int scale = U.arrayIndexScale(Node[].class);
649             if ((scale & (scale - 1)) != 0 || scale > (1 << ASHIFT))
650                 throw new Error("Unsupported array scale");
651             // ABASE absorbs padding in front of element 0
652             ABASE = U.arrayBaseOffset(Node[].class) + (1 << ASHIFT);
653         } catch (ReflectiveOperationException e) {
654             throw new Error(e);
655         }
656     }
657 
658 }
659