• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.security.AccessControlContext;
39 import java.security.ProtectionDomain;
40 
41 /**
42  * A thread managed by a {@link ForkJoinPool}, which executes
43  * {@link ForkJoinTask}s.
44  * This class is subclassable solely for the sake of adding
45  * functionality -- there are no overridable methods dealing with
46  * scheduling or execution.  However, you can override initialization
47  * and termination methods surrounding the main task processing loop.
48  * If you do create such a subclass, you will also need to supply a
49  * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
50  * {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.
51  *
52  * @since 1.7
53  * @author Doug Lea
54  */
55 public class ForkJoinWorkerThread extends Thread {
56     /*
57      * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
58      * ForkJoinTasks. For explanation, see the internal documentation
59      * of class ForkJoinPool.
60      *
61      * This class just maintains links to its pool and WorkQueue.  The
62      * pool field is set immediately upon construction, but the
63      * workQueue field is not set until a call to registerWorker
64      * completes. This leads to a visibility race, that is tolerated
65      * by requiring that the workQueue field is only accessed by the
66      * owning thread.
67      *
68      * Support for (non-public) subclass InnocuousForkJoinWorkerThread
69      * requires that we break quite a lot of encapsulation (via Unsafe)
70      * both here and in the subclass to access and set Thread fields.
71      */
72 
73     final ForkJoinPool pool;                // the pool this thread works in
74     final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
75 
76     /**
77      * Creates a ForkJoinWorkerThread operating in the given pool.
78      *
79      * @param pool the pool this thread works in
80      * @throws NullPointerException if pool is null
81      */
ForkJoinWorkerThread(ForkJoinPool pool)82     protected ForkJoinWorkerThread(ForkJoinPool pool) {
83         // Use a placeholder until a useful name can be set in registerWorker
84         super("aForkJoinWorkerThread");
85         this.pool = pool;
86         this.workQueue = pool.registerWorker(this);
87     }
88 
89     /**
90      * Version for InnocuousForkJoinWorkerThread.
91      */
ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, AccessControlContext acc)92     ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
93                          AccessControlContext acc) {
94         super(threadGroup, null, "aForkJoinWorkerThread");
95         U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);
96         eraseThreadLocals(); // clear before registering
97         this.pool = pool;
98         this.workQueue = pool.registerWorker(this);
99     }
100 
101     /**
102      * Returns the pool hosting this thread.
103      *
104      * @return the pool
105      */
getPool()106     public ForkJoinPool getPool() {
107         return pool;
108     }
109 
110     /**
111      * Returns the unique index number of this thread in its pool.
112      * The returned value ranges from zero to the maximum number of
113      * threads (minus one) that may exist in the pool, and does not
114      * change during the lifetime of the thread.  This method may be
115      * useful for applications that track status or collect results
116      * per-worker-thread rather than per-task.
117      *
118      * @return the index number
119      */
getPoolIndex()120     public int getPoolIndex() {
121         return workQueue.getPoolIndex();
122     }
123 
124     /**
125      * Initializes internal state after construction but before
126      * processing any tasks. If you override this method, you must
127      * invoke {@code super.onStart()} at the beginning of the method.
128      * Initialization requires care: Most fields must have legal
129      * default values, to ensure that attempted accesses from other
130      * threads work correctly even before this thread starts
131      * processing tasks.
132      */
onStart()133     protected void onStart() {
134     }
135 
136     /**
137      * Performs cleanup associated with termination of this worker
138      * thread.  If you override this method, you must invoke
139      * {@code super.onTermination} at the end of the overridden method.
140      *
141      * @param exception the exception causing this thread to abort due
142      * to an unrecoverable error, or {@code null} if completed normally
143      */
onTermination(Throwable exception)144     protected void onTermination(Throwable exception) {
145     }
146 
147     /**
148      * This method is required to be public, but should never be
149      * called explicitly. It performs the main run loop to execute
150      * {@link ForkJoinTask}s.
151      */
run()152     public void run() {
153         if (workQueue.array == null) { // only run once
154             Throwable exception = null;
155             try {
156                 onStart();
157                 pool.runWorker(workQueue);
158             } catch (Throwable ex) {
159                 exception = ex;
160             } finally {
161                 try {
162                     onTermination(exception);
163                 } catch (Throwable ex) {
164                     if (exception == null)
165                         exception = ex;
166                 } finally {
167                     pool.deregisterWorker(this, exception);
168                 }
169             }
170         }
171     }
172 
173     /**
174      * Erases ThreadLocals by nulling out Thread maps.
175      */
eraseThreadLocals()176     final void eraseThreadLocals() {
177         U.putObject(this, THREADLOCALS, null);
178         U.putObject(this, INHERITABLETHREADLOCALS, null);
179     }
180 
181     /**
182      * Non-public hook method for InnocuousForkJoinWorkerThread.
183      */
afterTopLevelExec()184     void afterTopLevelExec() {
185     }
186 
187     // Set up to allow setting thread fields in constructor
188     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
189     private static final long THREADLOCALS;
190     private static final long INHERITABLETHREADLOCALS;
191     private static final long INHERITEDACCESSCONTROLCONTEXT;
192     static {
193         try {
194             THREADLOCALS = U.objectFieldOffset
195                 (Thread.class.getDeclaredField("threadLocals"));
196             INHERITABLETHREADLOCALS = U.objectFieldOffset
197                 (Thread.class.getDeclaredField("inheritableThreadLocals"));
198             INHERITEDACCESSCONTROLCONTEXT = U.objectFieldOffset
199                 (Thread.class.getDeclaredField("inheritedAccessControlContext"));
200         } catch (ReflectiveOperationException e) {
201             throw new Error(e);
202         }
203     }
204 
205     /**
206      * A worker thread that has no permissions, is not a member of any
207      * user-defined ThreadGroup, and erases all ThreadLocals after
208      * running each top-level task.
209      */
210     static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
211         /** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
212         private static final ThreadGroup innocuousThreadGroup =
213             createThreadGroup();
214 
215         /** An AccessControlContext supporting no privileges */
216         private static final AccessControlContext INNOCUOUS_ACC =
217             new AccessControlContext(
218                 new ProtectionDomain[] {
219                     new ProtectionDomain(null, null)
220                 });
221 
InnocuousForkJoinWorkerThread(ForkJoinPool pool)222         InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
223             super(pool, innocuousThreadGroup, INNOCUOUS_ACC);
224         }
225 
226         @Override // to erase ThreadLocals
afterTopLevelExec()227         void afterTopLevelExec() {
228             eraseThreadLocals();
229         }
230 
231         @Override // to always report system loader
getContextClassLoader()232         public ClassLoader getContextClassLoader() {
233             return ClassLoader.getSystemClassLoader();
234         }
235 
236         @Override // to silently fail
setUncaughtExceptionHandler(UncaughtExceptionHandler x)237         public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }
238 
239         @Override // paranoically
setContextClassLoader(ClassLoader cl)240         public void setContextClassLoader(ClassLoader cl) {
241             throw new SecurityException("setContextClassLoader");
242         }
243 
244         /**
245          * Returns a new group with the system ThreadGroup (the
246          * topmost, parent-less group) as parent.  Uses Unsafe to
247          * traverse Thread.group and ThreadGroup.parent fields.
248          */
createThreadGroup()249         private static ThreadGroup createThreadGroup() {
250             try {
251                 sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();
252                 long tg = u.objectFieldOffset
253                     (Thread.class.getDeclaredField("group"));
254                 long gp = u.objectFieldOffset
255                     (ThreadGroup.class.getDeclaredField("parent"));
256                 ThreadGroup group = (ThreadGroup)
257                     u.getObject(Thread.currentThread(), tg);
258                 while (group != null) {
259                     ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);
260                     if (parent == null)
261                         return new ThreadGroup(group,
262                                                "InnocuousForkJoinWorkerThreadGroup");
263                     group = parent;
264                 }
265             } catch (ReflectiveOperationException e) {
266                 throw new Error(e);
267             }
268             // fall through if null as cannot-happen safeguard
269             throw new Error("Cannot create ThreadGroup");
270         }
271     }
272 
273 }
274