1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 9 import java.security.AccessControlContext; 10 import java.security.ProtectionDomain; 11 12 /** 13 * A thread managed by a {@link ForkJoinPool}, which executes 14 * {@link ForkJoinTask}s. 15 * This class is subclassable solely for the sake of adding 16 * functionality -- there are no overridable methods dealing with 17 * scheduling or execution. However, you can override initialization 18 * and termination methods surrounding the main task processing loop. 19 * If you do create such a subclass, you will also need to supply a 20 * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to 21 * {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}. 22 * 23 * @since 1.7 24 * @author Doug Lea 25 */ 26 public class ForkJoinWorkerThread extends Thread { 27 /* 28 * ForkJoinWorkerThreads are managed by ForkJoinPools and perform 29 * ForkJoinTasks. For explanation, see the internal documentation 30 * of class ForkJoinPool. 31 * 32 * This class just maintains links to its pool and WorkQueue. The 33 * pool field is set immediately upon construction, but the 34 * workQueue field is not set until a call to registerWorker 35 * completes. This leads to a visibility race, that is tolerated 36 * by requiring that the workQueue field is only accessed by the 37 * owning thread. 38 * 39 * Support for (non-public) subclass InnocuousForkJoinWorkerThread 40 * requires that we break quite a lot of encapsulation (via Unsafe) 41 * both here and in the subclass to access and set Thread fields. 42 */ 43 44 final ForkJoinPool pool; // the pool this thread works in 45 final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics 46 47 /** 48 * Creates a ForkJoinWorkerThread operating in the given pool. 49 * 50 * @param pool the pool this thread works in 51 * @throws NullPointerException if pool is null 52 */ ForkJoinWorkerThread(ForkJoinPool pool)53 protected ForkJoinWorkerThread(ForkJoinPool pool) { 54 // Use a placeholder until a useful name can be set in registerWorker 55 super("aForkJoinWorkerThread"); 56 this.pool = pool; 57 this.workQueue = pool.registerWorker(this); 58 } 59 60 /** 61 * Version for InnocuousForkJoinWorkerThread. 62 */ ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, AccessControlContext acc)63 ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, 64 AccessControlContext acc) { 65 super(threadGroup, null, "aForkJoinWorkerThread"); 66 U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc); 67 eraseThreadLocals(); // clear before registering 68 this.pool = pool; 69 this.workQueue = pool.registerWorker(this); 70 } 71 72 /** 73 * Returns the pool hosting this thread. 74 * 75 * @return the pool 76 */ getPool()77 public ForkJoinPool getPool() { 78 return pool; 79 } 80 81 /** 82 * Returns the unique index number of this thread in its pool. 83 * The returned value ranges from zero to the maximum number of 84 * threads (minus one) that may exist in the pool, and does not 85 * change during the lifetime of the thread. This method may be 86 * useful for applications that track status or collect results 87 * per-worker-thread rather than per-task. 88 * 89 * @return the index number 90 */ getPoolIndex()91 public int getPoolIndex() { 92 return workQueue.getPoolIndex(); 93 } 94 95 /** 96 * Initializes internal state after construction but before 97 * processing any tasks. If you override this method, you must 98 * invoke {@code super.onStart()} at the beginning of the method. 99 * Initialization requires care: Most fields must have legal 100 * default values, to ensure that attempted accesses from other 101 * threads work correctly even before this thread starts 102 * processing tasks. 103 */ onStart()104 protected void onStart() { 105 } 106 107 /** 108 * Performs cleanup associated with termination of this worker 109 * thread. If you override this method, you must invoke 110 * {@code super.onTermination} at the end of the overridden method. 111 * 112 * @param exception the exception causing this thread to abort due 113 * to an unrecoverable error, or {@code null} if completed normally 114 */ onTermination(Throwable exception)115 protected void onTermination(Throwable exception) { 116 } 117 118 /** 119 * This method is required to be public, but should never be 120 * called explicitly. It performs the main run loop to execute 121 * {@link ForkJoinTask}s. 122 */ run()123 public void run() { 124 if (workQueue.array == null) { // only run once 125 Throwable exception = null; 126 try { 127 onStart(); 128 pool.runWorker(workQueue); 129 } catch (Throwable ex) { 130 exception = ex; 131 } finally { 132 try { 133 onTermination(exception); 134 } catch (Throwable ex) { 135 if (exception == null) 136 exception = ex; 137 } finally { 138 pool.deregisterWorker(this, exception); 139 } 140 } 141 } 142 } 143 144 /** 145 * Erases ThreadLocals by nulling out Thread maps. 146 */ eraseThreadLocals()147 final void eraseThreadLocals() { 148 U.putObject(this, THREADLOCALS, null); 149 U.putObject(this, INHERITABLETHREADLOCALS, null); 150 } 151 152 /** 153 * Non-public hook method for InnocuousForkJoinWorkerThread. 154 */ afterTopLevelExec()155 void afterTopLevelExec() { 156 } 157 158 // Set up to allow setting thread fields in constructor 159 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 160 private static final long THREADLOCALS; 161 private static final long INHERITABLETHREADLOCALS; 162 private static final long INHERITEDACCESSCONTROLCONTEXT; 163 static { 164 try { 165 THREADLOCALS = U.objectFieldOffset 166 (Thread.class.getDeclaredField("threadLocals")); 167 INHERITABLETHREADLOCALS = U.objectFieldOffset 168 (Thread.class.getDeclaredField("inheritableThreadLocals")); 169 INHERITEDACCESSCONTROLCONTEXT = U.objectFieldOffset 170 (Thread.class.getDeclaredField("inheritedAccessControlContext")); 171 } catch (ReflectiveOperationException e) { 172 throw new Error(e); 173 } 174 } 175 176 /** 177 * A worker thread that has no permissions, is not a member of any 178 * user-defined ThreadGroup, and erases all ThreadLocals after 179 * running each top-level task. 180 */ 181 static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread { 182 /** The ThreadGroup for all InnocuousForkJoinWorkerThreads */ 183 private static final ThreadGroup innocuousThreadGroup = 184 createThreadGroup(); 185 186 /** An AccessControlContext supporting no privileges */ 187 private static final AccessControlContext INNOCUOUS_ACC = 188 new AccessControlContext( 189 new ProtectionDomain[] { 190 new ProtectionDomain(null, null) 191 }); 192 InnocuousForkJoinWorkerThread(ForkJoinPool pool)193 InnocuousForkJoinWorkerThread(ForkJoinPool pool) { 194 super(pool, innocuousThreadGroup, INNOCUOUS_ACC); 195 } 196 197 @Override // to erase ThreadLocals afterTopLevelExec()198 void afterTopLevelExec() { 199 eraseThreadLocals(); 200 } 201 202 @Override // to always report system loader getContextClassLoader()203 public ClassLoader getContextClassLoader() { 204 return ClassLoader.getSystemClassLoader(); 205 } 206 207 @Override // to silently fail setUncaughtExceptionHandler(UncaughtExceptionHandler x)208 public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { } 209 210 @Override // paranoically setContextClassLoader(ClassLoader cl)211 public void setContextClassLoader(ClassLoader cl) { 212 throw new SecurityException("setContextClassLoader"); 213 } 214 215 /** 216 * Returns a new group with the system ThreadGroup (the 217 * topmost, parent-less group) as parent. Uses Unsafe to 218 * traverse Thread.group and ThreadGroup.parent fields. 219 */ createThreadGroup()220 private static ThreadGroup createThreadGroup() { 221 try { 222 sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe(); 223 long tg = u.objectFieldOffset 224 (Thread.class.getDeclaredField("group")); 225 long gp = u.objectFieldOffset 226 (ThreadGroup.class.getDeclaredField("parent")); 227 ThreadGroup group = (ThreadGroup) 228 u.getObject(Thread.currentThread(), tg); 229 while (group != null) { 230 ThreadGroup parent = (ThreadGroup)u.getObject(group, gp); 231 if (parent == null) 232 return new ThreadGroup(group, 233 "InnocuousForkJoinWorkerThreadGroup"); 234 group = parent; 235 } 236 } catch (ReflectiveOperationException e) { 237 throw new Error(e); 238 } 239 // fall through if null as cannot-happen safeguard 240 throw new Error("Cannot create ThreadGroup"); 241 } 242 } 243 244 } 245