• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.security.AccessController;
39 import java.security.AccessControlContext;
40 import java.security.PrivilegedAction;
41 import java.security.ProtectionDomain;
42 
43 /**
44  * A thread managed by a {@link ForkJoinPool}, which executes
45  * {@link ForkJoinTask}s.
46  * This class is subclassable solely for the sake of adding
47  * functionality -- there are no overridable methods dealing with
48  * scheduling or execution.  However, you can override initialization
49  * and termination methods surrounding the main task processing loop.
50  * If you do create such a subclass, you will also need to supply a
51  * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
52  * {@linkplain ForkJoinPool#ForkJoinPool(int, ForkJoinWorkerThreadFactory,
53  * UncaughtExceptionHandler, boolean, int, int, int, Predicate, long, TimeUnit)
54  * use it} in a {@code ForkJoinPool}.
55  *
56  * @since 1.7
57  * @author Doug Lea
58  */
59 public class ForkJoinWorkerThread extends Thread {
60     /*
61      * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
62      * ForkJoinTasks. For explanation, see the internal documentation
63      * of class ForkJoinPool.
64      *
65      * This class just maintains links to its pool and WorkQueue.
66      */
67 
68     final ForkJoinPool pool;                // the pool this thread works in
69     final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
70 
71     /**
72      * Full nonpublic constructor.
73      */
ForkJoinWorkerThread(ThreadGroup group, ForkJoinPool pool, boolean useSystemClassLoader, boolean clearThreadLocals)74     ForkJoinWorkerThread(ThreadGroup group, ForkJoinPool pool,
75                          boolean useSystemClassLoader,
76                          boolean clearThreadLocals) {
77         super(group, null, pool.nextWorkerThreadName(), 0L, !clearThreadLocals);
78         UncaughtExceptionHandler handler = (this.pool = pool).ueh;
79         this.workQueue = new ForkJoinPool.WorkQueue(this, 0);
80         if (clearThreadLocals)
81             workQueue.setClearThreadLocals();
82         super.setDaemon(true);
83         if (handler != null)
84             super.setUncaughtExceptionHandler(handler);
85         if (useSystemClassLoader)
86             super.setContextClassLoader(ClassLoader.getSystemClassLoader());
87     }
88 
89     /**
90      * Creates a ForkJoinWorkerThread operating in the given thread group and
91      * pool, and with the given policy for preserving ThreadLocals.
92      *
93      * @param group if non-null, the thread group for this
94      * thread. Otherwise, the thread group is chosen by the security
95      * manager if present, else set to the current thread's thread
96      * group.
97      * @param pool the pool this thread works in
98      * @param preserveThreadLocals if true, always preserve the values of
99      * ThreadLocal variables across tasks; otherwise they may be cleared.
100      * @throws NullPointerException if pool is null
101      * @since 19
102      */
ForkJoinWorkerThread(ThreadGroup group, ForkJoinPool pool, boolean preserveThreadLocals)103     protected ForkJoinWorkerThread(ThreadGroup group, ForkJoinPool pool,
104                                    boolean preserveThreadLocals) {
105         this(group, pool, false, !preserveThreadLocals);
106     }
107 
108     /**
109      * Creates a ForkJoinWorkerThread operating in the given pool.
110      *
111      * @param pool the pool this thread works in
112      * @throws NullPointerException if pool is null
113      */
ForkJoinWorkerThread(ForkJoinPool pool)114     protected ForkJoinWorkerThread(ForkJoinPool pool) {
115         this(null, pool, false, false);
116     }
117 
118     /**
119      * Returns the pool hosting this thread.
120      *
121      * @return the pool
122      */
getPool()123     public ForkJoinPool getPool() {
124         return pool;
125     }
126 
127     /**
128      * Returns the unique index number of this thread in its pool.
129      * The returned value ranges from zero to the maximum number of
130      * threads (minus one) that may exist in the pool, and does not
131      * change during the lifetime of the thread.  This method may be
132      * useful for applications that track status or collect results
133      * per-worker-thread rather than per-task.
134      *
135      * @return the index number
136      */
getPoolIndex()137     public int getPoolIndex() {
138         return workQueue.getPoolIndex();
139     }
140 
141     /**
142      * {@return a (non-negative) estimate of the number of tasks in the
143      * thread's queue}
144      *
145      * @since 20
146      * @see ForkJoinPool#getQueuedTaskCount()
147      */
getQueuedTaskCount()148     public int getQueuedTaskCount() {
149         return workQueue.queueSize();
150     }
151 
152     /**
153      * Initializes internal state after construction but before
154      * processing any tasks. If you override this method, you must
155      * invoke {@code super.onStart()} at the beginning of the method.
156      * Initialization requires care: Most fields must have legal
157      * default values, to ensure that attempted accesses from other
158      * threads work correctly even before this thread starts
159      * processing tasks.
160      */
onStart()161     protected void onStart() {
162     }
163 
164     /**
165      * Performs cleanup associated with termination of this worker
166      * thread.  If you override this method, you must invoke
167      * {@code super.onTermination} at the end of the overridden method.
168      *
169      * @param exception the exception causing this thread to abort due
170      * to an unrecoverable error, or {@code null} if completed normally
171      */
onTermination(Throwable exception)172     protected void onTermination(Throwable exception) {
173     }
174 
175     /**
176      * This method is required to be public, but should never be
177      * called explicitly. It performs the main run loop to execute
178      * {@link ForkJoinTask}s.
179      */
run()180     public void run() {
181         Throwable exception = null;
182         ForkJoinPool p = pool;
183         ForkJoinPool.WorkQueue w = workQueue;
184         if (p != null && w != null) {   // skip on failed initialization
185             try {
186                 p.registerWorker(w);
187                 onStart();
188                 p.runWorker(w);
189             } catch (Throwable ex) {
190                 exception = ex;
191             } finally {
192                 try {
193                     onTermination(exception);
194                 } catch (Throwable ex) {
195                     if (exception == null)
196                         exception = ex;
197                 } finally {
198                     p.deregisterWorker(this, exception);
199                 }
200             }
201         }
202     }
203 
204     /**
205      * A worker thread that has no permissions, is not a member of any
206      * user-defined ThreadGroup, uses the system class loader as
207      * thread context class loader, and clears all ThreadLocals after
208      * running each top-level task.
209      */
210     static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
211         /** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
212         private static final ThreadGroup innocuousThreadGroup;
213         @SuppressWarnings("removal")
214         private static final AccessControlContext innocuousACC;
InnocuousForkJoinWorkerThread(ForkJoinPool pool)215         InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
216             super(innocuousThreadGroup, pool, true, true);
217         }
218 
219         @Override @SuppressWarnings("removal")
onStart()220         protected void onStart() {
221             Thread t = Thread.currentThread();
222             ThreadLocalRandom.setInheritedAccessControlContext(t, innocuousACC);
223         }
224 
225         @Override // to silently fail
setUncaughtExceptionHandler(UncaughtExceptionHandler x)226         public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }
227 
228         @Override // paranoically
setContextClassLoader(ClassLoader cl)229         public void setContextClassLoader(ClassLoader cl) {
230             if (cl != null && ClassLoader.getSystemClassLoader() != cl)
231                 throw new SecurityException("setContextClassLoader");
232         }
233 
234         @SuppressWarnings("removal")
createACC()235         static AccessControlContext createACC() {
236             return new AccessControlContext(
237                 new ProtectionDomain[] { new ProtectionDomain(null, null) });
238         }
createGroup()239         static ThreadGroup createGroup() {
240             ThreadGroup group = Thread.currentThread().getThreadGroup();
241             for (ThreadGroup p; (p = group.getParent()) != null; )
242                 group = p;
243             return new ThreadGroup(group, "InnocuousForkJoinWorkerThreadGroup");
244         }
245         static {
246             @SuppressWarnings("removal")
247             SecurityManager sm = System.getSecurityManager();
248             @SuppressWarnings("removal")
249             ThreadGroup g = innocuousThreadGroup =
250                 (sm == null) ? createGroup() :
251                 AccessController.doPrivileged(new PrivilegedAction<>() {
252                         public ThreadGroup run() {
253                             return createGroup(); }});
254             @SuppressWarnings("removal")
255             AccessControlContext a = innocuousACC =
256                 (sm == null) ? createACC() :
257                 AccessController.doPrivileged(new PrivilegedAction<>() {
258                         public AccessControlContext run() {
259                             return createACC(); }});
260         }
261     }
262 }
263