• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent;
8 
9 import java.security.AccessControlContext;
10 import java.security.ProtectionDomain;
11 
12 /**
13  * A thread managed by a {@link ForkJoinPool}, which executes
14  * {@link ForkJoinTask}s.
15  * This class is subclassable solely for the sake of adding
16  * functionality -- there are no overridable methods dealing with
17  * scheduling or execution.  However, you can override initialization
18  * and termination methods surrounding the main task processing loop.
19  * If you do create such a subclass, you will also need to supply a
20  * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
21  * {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.
22  *
23  * @since 1.7
24  * @author Doug Lea
25  */
26 public class ForkJoinWorkerThread extends Thread {
27     /*
28      * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
29      * ForkJoinTasks. For explanation, see the internal documentation
30      * of class ForkJoinPool.
31      *
32      * This class just maintains links to its pool and WorkQueue.  The
33      * pool field is set immediately upon construction, but the
34      * workQueue field is not set until a call to registerWorker
35      * completes. This leads to a visibility race, that is tolerated
36      * by requiring that the workQueue field is only accessed by the
37      * owning thread.
38      *
39      * Support for (non-public) subclass InnocuousForkJoinWorkerThread
40      * requires that we break quite a lot of encapsulation (via Unsafe)
41      * both here and in the subclass to access and set Thread fields.
42      */
43 
44     final ForkJoinPool pool;                // the pool this thread works in
45     final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
46 
47     /**
48      * Creates a ForkJoinWorkerThread operating in the given pool.
49      *
50      * @param pool the pool this thread works in
51      * @throws NullPointerException if pool is null
52      */
ForkJoinWorkerThread(ForkJoinPool pool)53     protected ForkJoinWorkerThread(ForkJoinPool pool) {
54         // Use a placeholder until a useful name can be set in registerWorker
55         super("aForkJoinWorkerThread");
56         this.pool = pool;
57         this.workQueue = pool.registerWorker(this);
58     }
59 
60     /**
61      * Version for InnocuousForkJoinWorkerThread.
62      */
ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, AccessControlContext acc)63     ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
64                          AccessControlContext acc) {
65         super(threadGroup, null, "aForkJoinWorkerThread");
66         U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);
67         eraseThreadLocals(); // clear before registering
68         this.pool = pool;
69         this.workQueue = pool.registerWorker(this);
70     }
71 
72     /**
73      * Returns the pool hosting this thread.
74      *
75      * @return the pool
76      */
getPool()77     public ForkJoinPool getPool() {
78         return pool;
79     }
80 
81     /**
82      * Returns the unique index number of this thread in its pool.
83      * The returned value ranges from zero to the maximum number of
84      * threads (minus one) that may exist in the pool, and does not
85      * change during the lifetime of the thread.  This method may be
86      * useful for applications that track status or collect results
87      * per-worker-thread rather than per-task.
88      *
89      * @return the index number
90      */
getPoolIndex()91     public int getPoolIndex() {
92         return workQueue.getPoolIndex();
93     }
94 
95     /**
96      * Initializes internal state after construction but before
97      * processing any tasks. If you override this method, you must
98      * invoke {@code super.onStart()} at the beginning of the method.
99      * Initialization requires care: Most fields must have legal
100      * default values, to ensure that attempted accesses from other
101      * threads work correctly even before this thread starts
102      * processing tasks.
103      */
onStart()104     protected void onStart() {
105     }
106 
107     /**
108      * Performs cleanup associated with termination of this worker
109      * thread.  If you override this method, you must invoke
110      * {@code super.onTermination} at the end of the overridden method.
111      *
112      * @param exception the exception causing this thread to abort due
113      * to an unrecoverable error, or {@code null} if completed normally
114      */
onTermination(Throwable exception)115     protected void onTermination(Throwable exception) {
116     }
117 
118     /**
119      * This method is required to be public, but should never be
120      * called explicitly. It performs the main run loop to execute
121      * {@link ForkJoinTask}s.
122      */
run()123     public void run() {
124         if (workQueue.array == null) { // only run once
125             Throwable exception = null;
126             try {
127                 onStart();
128                 pool.runWorker(workQueue);
129             } catch (Throwable ex) {
130                 exception = ex;
131             } finally {
132                 try {
133                     onTermination(exception);
134                 } catch (Throwable ex) {
135                     if (exception == null)
136                         exception = ex;
137                 } finally {
138                     pool.deregisterWorker(this, exception);
139                 }
140             }
141         }
142     }
143 
144     /**
145      * Erases ThreadLocals by nulling out Thread maps.
146      */
eraseThreadLocals()147     final void eraseThreadLocals() {
148         U.putObject(this, THREADLOCALS, null);
149         U.putObject(this, INHERITABLETHREADLOCALS, null);
150     }
151 
152     /**
153      * Non-public hook method for InnocuousForkJoinWorkerThread.
154      */
afterTopLevelExec()155     void afterTopLevelExec() {
156     }
157 
158     // Set up to allow setting thread fields in constructor
159     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
160     private static final long THREADLOCALS;
161     private static final long INHERITABLETHREADLOCALS;
162     private static final long INHERITEDACCESSCONTROLCONTEXT;
163     static {
164         try {
165             THREADLOCALS = U.objectFieldOffset
166                 (Thread.class.getDeclaredField("threadLocals"));
167             INHERITABLETHREADLOCALS = U.objectFieldOffset
168                 (Thread.class.getDeclaredField("inheritableThreadLocals"));
169             INHERITEDACCESSCONTROLCONTEXT = U.objectFieldOffset
170                 (Thread.class.getDeclaredField("inheritedAccessControlContext"));
171         } catch (ReflectiveOperationException e) {
172             throw new Error(e);
173         }
174     }
175 
176     /**
177      * A worker thread that has no permissions, is not a member of any
178      * user-defined ThreadGroup, and erases all ThreadLocals after
179      * running each top-level task.
180      */
181     static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
182         /** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
183         private static final ThreadGroup innocuousThreadGroup =
184             createThreadGroup();
185 
186         /** An AccessControlContext supporting no privileges */
187         private static final AccessControlContext INNOCUOUS_ACC =
188             new AccessControlContext(
189                 new ProtectionDomain[] {
190                     new ProtectionDomain(null, null)
191                 });
192 
InnocuousForkJoinWorkerThread(ForkJoinPool pool)193         InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
194             super(pool, innocuousThreadGroup, INNOCUOUS_ACC);
195         }
196 
197         @Override // to erase ThreadLocals
afterTopLevelExec()198         void afterTopLevelExec() {
199             eraseThreadLocals();
200         }
201 
202         @Override // to always report system loader
getContextClassLoader()203         public ClassLoader getContextClassLoader() {
204             return ClassLoader.getSystemClassLoader();
205         }
206 
207         @Override // to silently fail
setUncaughtExceptionHandler(UncaughtExceptionHandler x)208         public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }
209 
210         @Override // paranoically
setContextClassLoader(ClassLoader cl)211         public void setContextClassLoader(ClassLoader cl) {
212             throw new SecurityException("setContextClassLoader");
213         }
214 
215         /**
216          * Returns a new group with the system ThreadGroup (the
217          * topmost, parent-less group) as parent.  Uses Unsafe to
218          * traverse Thread.group and ThreadGroup.parent fields.
219          */
createThreadGroup()220         private static ThreadGroup createThreadGroup() {
221             try {
222                 sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();
223                 long tg = u.objectFieldOffset
224                     (Thread.class.getDeclaredField("group"));
225                 long gp = u.objectFieldOffset
226                     (ThreadGroup.class.getDeclaredField("parent"));
227                 ThreadGroup group = (ThreadGroup)
228                     u.getObject(Thread.currentThread(), tg);
229                 while (group != null) {
230                     ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);
231                     if (parent == null)
232                         return new ThreadGroup(group,
233                                                "InnocuousForkJoinWorkerThreadGroup");
234                     group = parent;
235                 }
236             } catch (ReflectiveOperationException e) {
237                 throw new Error(e);
238             }
239             // fall through if null as cannot-happen safeguard
240             throw new Error("Cannot create ThreadGroup");
241         }
242     }
243 
244 }
245