1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.Thread.UncaughtExceptionHandler; 39 import java.lang.invoke.MethodHandles; 40 import java.lang.invoke.VarHandle; 41 import java.security.AccessController; 42 import java.security.AccessControlContext; 43 import java.security.Permission; 44 import java.security.Permissions; 45 import java.security.PrivilegedAction; 46 import java.security.ProtectionDomain; 47 import java.util.ArrayList; 48 import java.util.Collection; 49 import java.util.Collections; 50 import java.util.List; 51 import java.util.function.Predicate; 52 import java.util.concurrent.locks.LockSupport; 53 54 /** 55 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 56 * A {@code ForkJoinPool} provides the entry point for submissions 57 * from non-{@code ForkJoinTask} clients, as well as management and 58 * monitoring operations. 59 * 60 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 61 * ExecutorService} mainly by virtue of employing 62 * <em>work-stealing</em>: all threads in the pool attempt to find and 63 * execute tasks submitted to the pool and/or created by other active 64 * tasks (eventually blocking waiting for work if none exist). This 65 * enables efficient processing when most tasks spawn other subtasks 66 * (as do most {@code ForkJoinTask}s), as well as when many small 67 * tasks are submitted to the pool from external clients. Especially 68 * when setting <em>asyncMode</em> to true in constructors, {@code 69 * ForkJoinPool}s may also be appropriate for use with event-style 70 * tasks that are never joined. All worker threads are initialized 71 * with {@link Thread#isDaemon} set {@code true}. 72 * 73 * <p>A static {@link #commonPool()} is available and appropriate for 74 * most applications. The common pool is used by any ForkJoinTask that 75 * is not explicitly submitted to a specified pool. Using the common 76 * pool normally reduces resource usage (its threads are slowly 77 * reclaimed during periods of non-use, and reinstated upon subsequent 78 * use). 79 * 80 * <p>For applications that require separate or custom pools, a {@code 81 * ForkJoinPool} may be constructed with a given target parallelism 82 * level; by default, equal to the number of available processors. 83 * The pool attempts to maintain enough active (or available) threads 84 * by dynamically adding, suspending, or resuming internal worker 85 * threads, even if some tasks are stalled waiting to join others. 86 * However, no such adjustments are guaranteed in the face of blocked 87 * I/O or other unmanaged synchronization. The nested {@link 88 * ManagedBlocker} interface enables extension of the kinds of 89 * synchronization accommodated. The default policies may be 90 * overridden using a constructor with parameters corresponding to 91 * those documented in class {@link ThreadPoolExecutor}. 92 * 93 * <p>In addition to execution and lifecycle control methods, this 94 * class provides status check methods (for example 95 * {@link #getStealCount}) that are intended to aid in developing, 96 * tuning, and monitoring fork/join applications. Also, method 97 * {@link #toString} returns indications of pool state in a 98 * convenient form for informal monitoring. 99 * 100 * <p>As is the case with other ExecutorServices, there are three 101 * main task execution methods summarized in the following table. 102 * These are designed to be used primarily by clients not already 103 * engaged in fork/join computations in the current pool. The main 104 * forms of these methods accept instances of {@code ForkJoinTask}, 105 * but overloaded forms also allow mixed execution of plain {@code 106 * Runnable}- or {@code Callable}- based activities as well. However, 107 * tasks that are already executing in a pool should normally instead 108 * use the within-computation forms listed in the table unless using 109 * async event-style tasks that are not usually joined, in which case 110 * there is little difference among choice of methods. 111 * 112 * <table class="plain"> 113 * <caption>Summary of task execution methods</caption> 114 * <tr> 115 * <td></td> 116 * <th scope="col"> Call from non-fork/join clients</th> 117 * <th scope="col"> Call from within fork/join computations</th> 118 * </tr> 119 * <tr> 120 * <th scope="row" style="text-align:left"> Arrange async execution</th> 121 * <td> {@link #execute(ForkJoinTask)}</td> 122 * <td> {@link ForkJoinTask#fork}</td> 123 * </tr> 124 * <tr> 125 * <th scope="row" style="text-align:left"> Await and obtain result</th> 126 * <td> {@link #invoke(ForkJoinTask)}</td> 127 * <td> {@link ForkJoinTask#invoke}</td> 128 * </tr> 129 * <tr> 130 * <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th> 131 * <td> {@link #submit(ForkJoinTask)}</td> 132 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 133 * </tr> 134 * </table> 135 * 136 * <p>The parameters used to construct the common pool may be controlled by 137 * setting the following {@linkplain System#getProperty system properties}: 138 * <ul> 139 * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism} 140 * - the parallelism level, a non-negative integer 141 * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory} 142 * - the class name of a {@link ForkJoinWorkerThreadFactory}. 143 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 144 * is used to load this class. 145 * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler} 146 * - the class name of a {@link UncaughtExceptionHandler}. 147 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 148 * is used to load this class. 149 * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares} 150 * - the maximum number of allowed extra threads to maintain target 151 * parallelism (default 256). 152 * </ul> 153 * If no thread factory is supplied via a system property, then the 154 * common pool uses a factory that uses the system class loader as the 155 * {@linkplain Thread#getContextClassLoader() thread context class loader}. 156 * In addition, if a {@link SecurityManager} is present, then 157 * the common pool uses a factory supplying threads that have no 158 * {@link Permissions} enabled. 159 * 160 * Upon any error in establishing these settings, default parameters 161 * are used. It is possible to disable or limit the use of threads in 162 * the common pool by setting the parallelism property to zero, and/or 163 * using a factory that may return {@code null}. However doing so may 164 * cause unjoined tasks to never be executed. 165 * 166 * <p><b>Implementation notes</b>: This implementation restricts the 167 * maximum number of running threads to 32767. Attempts to create 168 * pools with greater than the maximum number result in 169 * {@code IllegalArgumentException}. 170 * 171 * <p>This implementation rejects submitted tasks (that is, by throwing 172 * {@link RejectedExecutionException}) only when the pool is shut down 173 * or internal resources have been exhausted. 174 * 175 * @since 1.7 176 * @author Doug Lea 177 */ 178 @jdk.internal.vm.annotation.Contended 179 public class ForkJoinPool extends AbstractExecutorService { 180 181 /* 182 * Implementation Overview 183 * 184 * This class and its nested classes provide the main 185 * functionality and control for a set of worker threads: 186 * Submissions from non-FJ threads enter into submission queues. 187 * Workers take these tasks and typically split them into subtasks 188 * that may be stolen by other workers. Work-stealing based on 189 * randomized scans generally leads to better throughput than 190 * "work dealing" in which producers assign tasks to idle threads, 191 * in part because threads that have finished other tasks before 192 * the signalled thread wakes up (which can be a long time) can 193 * take the task instead. Preference rules give first priority to 194 * processing tasks from their own queues (LIFO or FIFO, depending 195 * on mode), then to randomized FIFO steals of tasks in other 196 * queues. This framework began as vehicle for supporting 197 * tree-structured parallelism using work-stealing. Over time, 198 * its scalability advantages led to extensions and changes to 199 * better support more diverse usage contexts. Because most 200 * internal methods and nested classes are interrelated, their 201 * main rationale and descriptions are presented here; individual 202 * methods and nested classes contain only brief comments about 203 * details. 204 * 205 * WorkQueues 206 * ========== 207 * 208 * Most operations occur within work-stealing queues (in nested 209 * class WorkQueue). These are special forms of Deques that 210 * support only three of the four possible end-operations -- push, 211 * pop, and poll (aka steal), under the further constraints that 212 * push and pop are called only from the owning thread (or, as 213 * extended here, under a lock), while poll may be called from 214 * other threads. (If you are unfamiliar with them, you probably 215 * want to read Herlihy and Shavit's book "The Art of 216 * Multiprocessor programming", chapter 16 describing these in 217 * more detail before proceeding.) The main work-stealing queue 218 * design is roughly similar to those in the papers "Dynamic 219 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 220 * (http://research.sun.com/scalable/pubs/index.html) and 221 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, 222 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). 223 * The main differences ultimately stem from GC requirements that 224 * we null out taken slots as soon as we can, to maintain as small 225 * a footprint as possible even in programs generating huge 226 * numbers of tasks. To accomplish this, we shift the CAS 227 * arbitrating pop vs poll (steal) from being on the indices 228 * ("base" and "top") to the slots themselves. 229 * 230 * Adding tasks then takes the form of a classic array push(task) 231 * in a circular buffer: 232 * q.array[q.top++ % length] = task; 233 * 234 * (The actual code needs to null-check and size-check the array, 235 * uses masking, not mod, for indexing a power-of-two-sized array, 236 * adds a release fence for publication, and possibly signals 237 * waiting workers to start scanning -- see below.) Both a 238 * successful pop and poll mainly entail a CAS of a slot from 239 * non-null to null. 240 * 241 * The pop operation (always performed by owner) is: 242 * if ((the task at top slot is not null) and 243 * (CAS slot to null)) 244 * decrement top and return task; 245 * 246 * And the poll operation (usually by a stealer) is 247 * if ((the task at base slot is not null) and 248 * (CAS slot to null)) 249 * increment base and return task; 250 * 251 * There are several variants of each of these. Most uses occur 252 * within operations that also interleave contention or emptiness 253 * tracking or inspection of elements before extracting them, so 254 * must interleave these with the above code. When performed by 255 * owner, getAndSet is used instead of CAS (see for example method 256 * nextLocalTask) which is usually more efficient, and possible 257 * because the top index cannot independently change during the 258 * operation. 259 * 260 * Memory ordering. See "Correct and Efficient Work-Stealing for 261 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 262 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an 263 * analysis of memory ordering requirements in work-stealing 264 * algorithms similar to (but different than) the one used here. 265 * Extracting tasks in array slots via (fully fenced) CAS provides 266 * primary synchronization. The base and top indices imprecisely 267 * guide where to extract from. We do not usually require strict 268 * orderings of array and index updates. Many index accesses use 269 * plain mode, with ordering constrained by surrounding context 270 * (usually with respect to element CASes or the two WorkQueue 271 * volatile fields source and phase). When not otherwise already 272 * constrained, reads of "base" by queue owners use acquire-mode, 273 * and some externally callable methods preface accesses with 274 * acquire fences. Additionally, to ensure that index update 275 * writes are not coalesced or postponed in loops etc, "opaque" 276 * mode is used in a few cases where timely writes are not 277 * otherwise ensured. The "locked" versions of push- and pop- 278 * based methods for shared queues differ from owned versions 279 * because locking already forces some of the ordering. 280 * 281 * Because indices and slot contents cannot always be consistent, 282 * a check that base == top indicates (momentary) emptiness, but 283 * otherwise may err on the side of possibly making the queue 284 * appear nonempty when a push, pop, or poll have not fully 285 * committed, or making it appear empty when an update of top has 286 * not yet been visibly written. (Method isEmpty() checks the 287 * case of a partially completed removal of the last element.) 288 * Because of this, the poll operation, considered individually, 289 * is not wait-free. One thief cannot successfully continue until 290 * another in-progress one (or, if previously empty, a push) 291 * visibly completes. This can stall threads when required to 292 * consume from a given queue (see method poll()). However, in 293 * the aggregate, we ensure at least probabilistic 294 * non-blockingness. If an attempted steal fails, a scanning 295 * thief chooses a different random victim target to try next. So, 296 * in order for one thief to progress, it suffices for any 297 * in-progress poll or new push on any empty queue to complete. 298 * 299 * This approach also enables support of a user mode in which 300 * local task processing is in FIFO, not LIFO order, simply by 301 * using poll rather than pop. This can be useful in 302 * message-passing frameworks in which tasks are never joined. 303 * 304 * WorkQueues are also used in a similar way for tasks submitted 305 * to the pool. We cannot mix these tasks in the same queues used 306 * by workers. Instead, we randomly associate submission queues 307 * with submitting threads, using a form of hashing. The 308 * ThreadLocalRandom probe value serves as a hash code for 309 * choosing existing queues, and may be randomly repositioned upon 310 * contention with other submitters. In essence, submitters act 311 * like workers except that they are restricted to executing local 312 * tasks that they submitted. Insertion of tasks in shared mode 313 * requires a lock but we use only a simple spinlock (using field 314 * phase), because submitters encountering a busy queue move to a 315 * different position to use or create other queues -- they block 316 * only when creating and registering new queues. Because it is 317 * used only as a spinlock, unlocking requires only a "releasing" 318 * store (using setRelease) unless otherwise signalling. 319 * 320 * Management 321 * ========== 322 * 323 * The main throughput advantages of work-stealing stem from 324 * decentralized control -- workers mostly take tasks from 325 * themselves or each other, at rates that can exceed a billion 326 * per second. The pool itself creates, activates (enables 327 * scanning for and running tasks), deactivates, blocks, and 328 * terminates threads, all with minimal central information. 329 * There are only a few properties that we can globally track or 330 * maintain, so we pack them into a small number of variables, 331 * often maintaining atomicity without blocking or locking. 332 * Nearly all essentially atomic control state is held in a few 333 * volatile variables that are by far most often read (not 334 * written) as status and consistency checks. We pack as much 335 * information into them as we can. 336 * 337 * Field "ctl" contains 64 bits holding information needed to 338 * atomically decide to add, enqueue (on an event queue), and 339 * dequeue and release workers. To enable this packing, we 340 * restrict maximum parallelism to (1<<15)-1 (which is far in 341 * excess of normal operating range) to allow ids, counts, and 342 * their negations (used for thresholding) to fit into 16bit 343 * subfields. 344 * 345 * Field "mode" holds configuration parameters as well as lifetime 346 * status, atomically and monotonically setting SHUTDOWN, STOP, 347 * and finally TERMINATED bits. 348 * 349 * Field "workQueues" holds references to WorkQueues. It is 350 * updated (only during worker creation and termination) under 351 * lock (using field workerNamePrefix as lock), but is otherwise 352 * concurrently readable, and accessed directly. We also ensure 353 * that uses of the array reference itself never become too stale 354 * in case of resizing, by arranging that (re-)reads are separated 355 * by at least one acquiring read access. To simplify index-based 356 * operations, the array size is always a power of two, and all 357 * readers must tolerate null slots. Worker queues are at odd 358 * indices. Shared (submission) queues are at even indices, up to 359 * a maximum of 64 slots, to limit growth even if the array needs 360 * to expand to add more workers. Grouping them together in this 361 * way simplifies and speeds up task scanning. 362 * 363 * All worker thread creation is on-demand, triggered by task 364 * submissions, replacement of terminated workers, and/or 365 * compensation for blocked workers. However, all other support 366 * code is set up to work with other policies. To ensure that we 367 * do not hold on to worker references that would prevent GC, all 368 * accesses to workQueues are via indices into the workQueues 369 * array (which is one source of some of the messy code 370 * constructions here). In essence, the workQueues array serves as 371 * a weak reference mechanism. Thus for example the stack top 372 * subfield of ctl stores indices, not references. 373 * 374 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we 375 * cannot let workers spin indefinitely scanning for tasks when 376 * none can be found immediately, and we cannot start/resume 377 * workers unless there appear to be tasks available. On the 378 * other hand, we must quickly prod them into action when new 379 * tasks are submitted or generated. In many usages, ramp-up time 380 * is the main limiting factor in overall performance, which is 381 * compounded at program start-up by JIT compilation and 382 * allocation. So we streamline this as much as possible. 383 * 384 * The "ctl" field atomically maintains total worker and 385 * "released" worker counts, plus the head of the available worker 386 * queue (actually stack, represented by the lower 32bit subfield 387 * of ctl). Released workers are those known to be scanning for 388 * and/or running tasks. Unreleased ("available") workers are 389 * recorded in the ctl stack. These workers are made available for 390 * signalling by enqueuing in ctl (see method runWorker). The 391 * "queue" is a form of Treiber stack. This is ideal for 392 * activating threads in most-recently used order, and improves 393 * performance and locality, outweighing the disadvantages of 394 * being prone to contention and inability to release a worker 395 * unless it is topmost on stack. To avoid missed signal problems 396 * inherent in any wait/signal design, available workers rescan 397 * for (and if found run) tasks after enqueuing. Normally their 398 * release status will be updated while doing so, but the released 399 * worker ctl count may underestimate the number of active 400 * threads. (However, it is still possible to determine quiescence 401 * via a validation traversal -- see isQuiescent). After an 402 * unsuccessful rescan, available workers are blocked until 403 * signalled (see signalWork). The top stack state holds the 404 * value of the "phase" field of the worker: its index and status, 405 * plus a version counter that, in addition to the count subfields 406 * (also serving as version stamps) provide protection against 407 * Treiber stack ABA effects. 408 * 409 * Creating workers. To create a worker, we pre-increment counts 410 * (serving as a reservation), and attempt to construct a 411 * ForkJoinWorkerThread via its factory. Upon construction, the 412 * new thread invokes registerWorker, where it constructs a 413 * WorkQueue and is assigned an index in the workQueues array 414 * (expanding the array if necessary). The thread is then started. 415 * Upon any exception across these steps, or null return from 416 * factory, deregisterWorker adjusts counts and records 417 * accordingly. If a null return, the pool continues running with 418 * fewer than the target number workers. If exceptional, the 419 * exception is propagated, generally to some external caller. 420 * Worker index assignment avoids the bias in scanning that would 421 * occur if entries were sequentially packed starting at the front 422 * of the workQueues array. We treat the array as a simple 423 * power-of-two hash table, expanding as needed. The seedIndex 424 * increment ensures no collisions until a resize is needed or a 425 * worker is deregistered and replaced, and thereafter keeps 426 * probability of collision low. We cannot use 427 * ThreadLocalRandom.getProbe() for similar purposes here because 428 * the thread has not started yet, but do so for creating 429 * submission queues for existing external threads (see 430 * externalPush). 431 * 432 * WorkQueue field "phase" is used by both workers and the pool to 433 * manage and track whether a worker is UNSIGNALLED (possibly 434 * blocked waiting for a signal). When a worker is enqueued its 435 * phase field is set. Note that phase field updates lag queue CAS 436 * releases so usage requires care -- seeing a negative phase does 437 * not guarantee that the worker is available. When queued, the 438 * lower 16 bits of scanState must hold its pool index. So we 439 * place the index there upon initialization and otherwise keep it 440 * there or restore it when necessary. 441 * 442 * The ctl field also serves as the basis for memory 443 * synchronization surrounding activation. This uses a more 444 * efficient version of a Dekker-like rule that task producers and 445 * consumers sync with each other by both writing/CASing ctl (even 446 * if to its current value). This would be extremely costly. So 447 * we relax it in several ways: (1) Producers only signal when 448 * their queue is possibly empty at some point during a push 449 * operation (which requires conservatively checking size zero or 450 * one to cover races). (2) Other workers propagate this signal 451 * when they find tasks in a queue with size greater than one. (3) 452 * Workers only enqueue after scanning (see below) and not finding 453 * any tasks. (4) Rather than CASing ctl to its current value in 454 * the common case where no action is required, we reduce write 455 * contention by equivalently prefacing signalWork when called by 456 * an external task producer using a memory access with 457 * full-volatile semantics or a "fullFence". 458 * 459 * Almost always, too many signals are issued, in part because a 460 * task producer cannot tell if some existing worker is in the 461 * midst of finishing one task (or already scanning) and ready to 462 * take another without being signalled. So the producer might 463 * instead activate a different worker that does not find any 464 * work, and then inactivates. This scarcely matters in 465 * steady-state computations involving all workers, but can create 466 * contention and bookkeeping bottlenecks during ramp-up, 467 * ramp-down, and small computations involving only a few workers. 468 * 469 * Scanning. Method scan (from runWorker) performs top-level 470 * scanning for tasks. (Similar scans appear in helpQuiesce and 471 * pollScan.) Each scan traverses and tries to poll from each 472 * queue starting at a random index. Scans are not performed in 473 * ideal random permutation order, to reduce cacheline 474 * contention. The pseudorandom generator need not have 475 * high-quality statistical properties in the long term, but just 476 * within computations; We use Marsaglia XorShifts (often via 477 * ThreadLocalRandom.nextSecondarySeed), which are cheap and 478 * suffice. Scanning also includes contention reduction: When 479 * scanning workers fail to extract an apparently existing task, 480 * they soon restart at a different pseudorandom index. This form 481 * of backoff improves throughput when many threads are trying to 482 * take tasks from few queues, which can be common in some usages. 483 * Scans do not otherwise explicitly take into account core 484 * affinities, loads, cache localities, etc, However, they do 485 * exploit temporal locality (which usually approximates these) by 486 * preferring to re-poll from the same queue after a successful 487 * poll before trying others (see method topLevelExec). However 488 * this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard 489 * against infinitely unfair looping under unbounded user task 490 * recursion, and also to reduce long-term contention when many 491 * threads poll few queues holding many small tasks. The bound is 492 * high enough to avoid much impact on locality and scheduling 493 * overhead. 494 * 495 * Trimming workers. To release resources after periods of lack of 496 * use, a worker starting to wait when the pool is quiescent will 497 * time out and terminate (see method runWorker) if the pool has 498 * remained quiescent for period given by field keepAlive. 499 * 500 * Shutdown and Termination. A call to shutdownNow invokes 501 * tryTerminate to atomically set a runState bit. The calling 502 * thread, as well as every other worker thereafter terminating, 503 * helps terminate others by cancelling their unprocessed tasks, 504 * and waking them up, doing so repeatedly until stable. Calls to 505 * non-abrupt shutdown() preface this by checking whether 506 * termination should commence by sweeping through queues (until 507 * stable) to ensure lack of in-flight submissions and workers 508 * about to process them before triggering the "STOP" phase of 509 * termination. 510 * 511 * Joining Tasks 512 * ============= 513 * 514 * Any of several actions may be taken when one worker is waiting 515 * to join a task stolen (or always held) by another. Because we 516 * are multiplexing many tasks on to a pool of workers, we can't 517 * always just let them block (as in Thread.join). We also cannot 518 * just reassign the joiner's run-time stack with another and 519 * replace it later, which would be a form of "continuation", that 520 * even if possible is not necessarily a good idea since we may 521 * need both an unblocked task and its continuation to progress. 522 * Instead we combine two tactics: 523 * 524 * Helping: Arranging for the joiner to execute some task that it 525 * would be running if the steal had not occurred. 526 * 527 * Compensating: Unless there are already enough live threads, 528 * method tryCompensate() may create or re-activate a spare 529 * thread to compensate for blocked joiners until they unblock. 530 * 531 * A third form (implemented in tryRemoveAndExec) amounts to 532 * helping a hypothetical compensator: If we can readily tell that 533 * a possible action of a compensator is to steal and execute the 534 * task being joined, the joining thread can do so directly, 535 * without the need for a compensation thread. 536 * 537 * The ManagedBlocker extension API can't use helping so relies 538 * only on compensation in method awaitBlocker. 539 * 540 * The algorithm in awaitJoin entails a form of "linear helping". 541 * Each worker records (in field source) the id of the queue from 542 * which it last stole a task. The scan in method awaitJoin uses 543 * these markers to try to find a worker to help (i.e., steal back 544 * a task from and execute it) that could hasten completion of the 545 * actively joined task. Thus, the joiner executes a task that 546 * would be on its own local deque if the to-be-joined task had 547 * not been stolen. This is a conservative variant of the approach 548 * described in Wagner & Calder "Leapfrogging: a portable 549 * technique for implementing efficient futures" SIGPLAN Notices, 550 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs 551 * mainly in that we only record queue ids, not full dependency 552 * links. This requires a linear scan of the workQueues array to 553 * locate stealers, but isolates cost to when it is needed, rather 554 * than adding to per-task overhead. Searches can fail to locate 555 * stealers GC stalls and the like delay recording sources. 556 * Further, even when accurately identified, stealers might not 557 * ever produce a task that the joiner can in turn help with. So, 558 * compensation is tried upon failure to find tasks to run. 559 * 560 * Compensation does not by default aim to keep exactly the target 561 * parallelism number of unblocked threads running at any given 562 * time. Some previous versions of this class employed immediate 563 * compensations for any blocked join. However, in practice, the 564 * vast majority of blockages are transient byproducts of GC and 565 * other JVM or OS activities that are made worse by replacement 566 * when they cause longer-term oversubscription. Rather than 567 * impose arbitrary policies, we allow users to override the 568 * default of only adding threads upon apparent starvation. The 569 * compensation mechanism may also be bounded. Bounds for the 570 * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope 571 * with programming errors and abuse before running out of 572 * resources to do so. 573 * 574 * Common Pool 575 * =========== 576 * 577 * The static common pool always exists after static 578 * initialization. Since it (or any other created pool) need 579 * never be used, we minimize initial construction overhead and 580 * footprint to the setup of about a dozen fields. 581 * 582 * When external threads submit to the common pool, they can 583 * perform subtask processing (see externalHelpComplete and 584 * related methods) upon joins. This caller-helps policy makes it 585 * sensible to set common pool parallelism level to one (or more) 586 * less than the total number of available cores, or even zero for 587 * pure caller-runs. We do not need to record whether external 588 * submissions are to the common pool -- if not, external help 589 * methods return quickly. These submitters would otherwise be 590 * blocked waiting for completion, so the extra effort (with 591 * liberally sprinkled task status checks) in inapplicable cases 592 * amounts to an odd form of limited spin-wait before blocking in 593 * ForkJoinTask.join. 594 * 595 * As a more appropriate default in managed environments, unless 596 * overridden by system properties, we use workers of subclass 597 * InnocuousForkJoinWorkerThread when there is a SecurityManager 598 * present. These workers have no permissions set, do not belong 599 * to any user-defined ThreadGroup, and erase all ThreadLocals 600 * after executing any top-level task (see 601 * WorkQueue.afterTopLevelExec). The associated mechanics (mainly 602 * in ForkJoinWorkerThread) may be JVM-dependent and must access 603 * particular Thread class fields to achieve this effect. 604 * 605 * Memory placement 606 * ================ 607 * 608 * Performance can be very sensitive to placement of instances of 609 * ForkJoinPool and WorkQueues and their queue arrays. To reduce 610 * false-sharing impact, the @Contended annotation isolates 611 * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl 612 * field. WorkQueue arrays are allocated (by their threads) with 613 * larger initial sizes than most ever need, mostly to reduce 614 * false sharing with current garbage collectors that use cardmark 615 * tables. 616 * 617 * Style notes 618 * =========== 619 * 620 * Memory ordering relies mainly on VarHandles. This can be 621 * awkward and ugly, but also reflects the need to control 622 * outcomes across the unusual cases that arise in very racy code 623 * with very few invariants. All fields are read into locals 624 * before use, and null-checked if they are references. Array 625 * accesses using masked indices include checks (that are always 626 * true) that the array length is non-zero to avoid compilers 627 * inserting more expensive traps. This is usually done in a 628 * "C"-like style of listing declarations at the heads of methods 629 * or blocks, and using inline assignments on first encounter. 630 * Nearly all explicit checks lead to bypass/return, not exception 631 * throws, because they may legitimately arise due to 632 * cancellation/revocation during shutdown. 633 * 634 * There is a lot of representation-level coupling among classes 635 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The 636 * fields of WorkQueue maintain data structures managed by 637 * ForkJoinPool, so are directly accessed. There is little point 638 * trying to reduce this, since any associated future changes in 639 * representations will need to be accompanied by algorithmic 640 * changes anyway. Several methods intrinsically sprawl because 641 * they must accumulate sets of consistent reads of fields held in 642 * local variables. Some others are artificially broken up to 643 * reduce producer/consumer imbalances due to dynamic compilation. 644 * There are also other coding oddities (including several 645 * unnecessary-looking hoisted null checks) that help some methods 646 * perform reasonably even when interpreted (not compiled). 647 * 648 * The order of declarations in this file is (with a few exceptions): 649 * (1) Static utility functions 650 * (2) Nested (static) classes 651 * (3) Static fields 652 * (4) Fields, along with constants used when unpacking some of them 653 * (5) Internal control methods 654 * (6) Callbacks and other support for ForkJoinTask methods 655 * (7) Exported methods 656 * (8) Static block initializing statics in minimally dependent order 657 */ 658 659 // Static utilities 660 661 /** 662 * If there is a security manager, makes sure caller has 663 * permission to modify threads. 664 */ checkPermission()665 private static void checkPermission() { 666 SecurityManager security = System.getSecurityManager(); 667 if (security != null) 668 security.checkPermission(modifyThreadPermission); 669 } 670 671 // Nested classes 672 673 /** 674 * Factory for creating new {@link ForkJoinWorkerThread}s. 675 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 676 * for {@code ForkJoinWorkerThread} subclasses that extend base 677 * functionality or initialize threads with different contexts. 678 */ 679 public static interface ForkJoinWorkerThreadFactory { 680 /** 681 * Returns a new worker thread operating in the given pool. 682 * Returning null or throwing an exception may result in tasks 683 * never being executed. If this method throws an exception, 684 * it is relayed to the caller of the method (for example 685 * {@code execute}) causing attempted thread creation. If this 686 * method returns null or throws an exception, it is not 687 * retried until the next attempted creation (for example 688 * another call to {@code execute}). 689 * 690 * @param pool the pool this thread works in 691 * @return the new worker thread, or {@code null} if the request 692 * to create a thread is rejected 693 * @throws NullPointerException if the pool is null 694 */ newThread(ForkJoinPool pool)695 public ForkJoinWorkerThread newThread(ForkJoinPool pool); 696 } 697 contextWithPermissions(Permission .... perms)698 static AccessControlContext contextWithPermissions(Permission ... perms) { 699 Permissions permissions = new Permissions(); 700 for (Permission perm : perms) 701 permissions.add(perm); 702 return new AccessControlContext( 703 new ProtectionDomain[] { new ProtectionDomain(null, permissions) }); 704 } 705 706 /** 707 * Default ForkJoinWorkerThreadFactory implementation; creates a 708 * new ForkJoinWorkerThread using the system class loader as the 709 * thread context class loader. 710 */ 711 private static final class DefaultForkJoinWorkerThreadFactory 712 implements ForkJoinWorkerThreadFactory { 713 private static final AccessControlContext ACC = contextWithPermissions( 714 new RuntimePermission("getClassLoader"), 715 new RuntimePermission("setContextClassLoader")); 716 newThread(ForkJoinPool pool)717 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 718 return AccessController.doPrivileged( 719 new PrivilegedAction<>() { 720 public ForkJoinWorkerThread run() { 721 return new ForkJoinWorkerThread( 722 pool, ClassLoader.getSystemClassLoader()); }}, 723 ACC); 724 } 725 } 726 727 // Constants shared across ForkJoinPool and WorkQueue 728 729 // Bounds 730 static final int SWIDTH = 16; // width of short 731 static final int SMASK = 0xffff; // short bits == max index 732 static final int MAX_CAP = 0x7fff; // max #workers - 1 733 static final int SQMASK = 0x007e; // max 64 (even) slots 734 735 // Masks and units for WorkQueue.phase and ctl sp subfield 736 static final int UNSIGNALLED = 1 << 31; // must be negative 737 static final int SS_SEQ = 1 << 16; // version count 738 static final int QLOCK = 1; // must be 1 739 740 // Mode bits and sentinels, some also used in WorkQueue id and.source fields 741 static final int OWNED = 1; // queue has owner thread 742 static final int FIFO = 1 << 16; // fifo queue or access mode 743 static final int SHUTDOWN = 1 << 18; 744 static final int TERMINATED = 1 << 19; 745 static final int STOP = 1 << 31; // must be negative 746 static final int QUIET = 1 << 30; // not scanning or working 747 static final int DORMANT = QUIET | UNSIGNALLED; 748 749 /** 750 * Initial capacity of work-stealing queue array. 751 * Must be a power of two, at least 2. 752 */ 753 static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 754 755 /** 756 * Maximum capacity for queue arrays. Must be a power of two less 757 * than or equal to 1 << (31 - width of array entry) to ensure 758 * lack of wraparound of index calculations, but defined to a 759 * value a bit less than this to help users trap runaway programs 760 * before saturating systems. 761 */ 762 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M 763 764 /** 765 * The maximum number of top-level polls per worker before 766 * checking other queues, expressed as a bit shift to, in effect, 767 * multiply by pool size, and then use as random value mask, so 768 * average bound is about poolSize*(1<<TOP_BOUND_SHIFT). See 769 * above for rationale. 770 */ 771 static final int TOP_BOUND_SHIFT = 10; 772 773 /** 774 * Queues supporting work-stealing as well as external task 775 * submission. See above for descriptions and algorithms. 776 */ 777 @jdk.internal.vm.annotation.Contended 778 static final class WorkQueue { 779 volatile int source; // source queue id, or sentinel 780 int id; // pool index, mode, tag 781 int base; // index of next slot for poll 782 int top; // index of next slot for push 783 volatile int phase; // versioned, negative: queued, 1: locked 784 int stackPred; // pool stack (ctl) predecessor link 785 int nsteals; // number of steals 786 ForkJoinTask<?>[] array; // the queued tasks; power of 2 size 787 final ForkJoinPool pool; // the containing pool (may be null) 788 final ForkJoinWorkerThread owner; // owning thread or null if shared 789 790 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { 791 this.pool = pool; 792 this.owner = owner; 793 // Place indices in the center of array (that is not yet allocated) 794 base = top = INITIAL_QUEUE_CAPACITY >>> 1; 795 } 796 797 /** 798 * Tries to lock shared queue by CASing phase field. 799 */ 800 final boolean tryLockPhase() { 801 return PHASE.compareAndSet(this, 0, 1); 802 } 803 804 final void releasePhaseLock() { 805 PHASE.setRelease(this, 0); 806 } 807 808 /** 809 * Returns an exportable index (used by ForkJoinWorkerThread). 810 */ 811 final int getPoolIndex() { 812 return (id & 0xffff) >>> 1; // ignore odd/even tag bit 813 } 814 815 /** 816 * Returns the approximate number of tasks in the queue. 817 */ 818 final int queueSize() { 819 int n = (int)BASE.getAcquire(this) - top; 820 return (n >= 0) ? 0 : -n; // ignore transient negative 821 } 822 823 /** 824 * Provides a more accurate estimate of whether this queue has 825 * any tasks than does queueSize, by checking whether a 826 * near-empty queue has at least one unclaimed task. 827 */ 828 final boolean isEmpty() { 829 ForkJoinTask<?>[] a; int n, cap, b; 830 VarHandle.acquireFence(); // needed by external callers 831 return ((n = (b = base) - top) >= 0 || // possibly one task 832 (n == -1 && ((a = array) == null || 833 (cap = a.length) == 0 || 834 a[(cap - 1) & b] == null))); 835 } 836 837 /** 838 * Pushes a task. Call only by owner in unshared queues. 839 * 840 * @param task the task. Caller must ensure non-null. 841 * @throws RejectedExecutionException if array cannot be resized 842 */ 843 final void push(ForkJoinTask<?> task) { 844 ForkJoinTask<?>[] a; 845 int s = top, d, cap, m; 846 ForkJoinPool p = pool; 847 if ((a = array) != null && (cap = a.length) > 0) { 848 QA.setRelease(a, (m = cap - 1) & s, task); 849 top = s + 1; 850 if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 && 851 p != null) { // size 0 or 1 852 VarHandle.fullFence(); 853 p.signalWork(); 854 } 855 else if (d == m) 856 growArray(false); 857 } 858 } 859 860 /** 861 * Version of push for shared queues. Call only with phase lock held. 862 * @return true if should signal work 863 */ 864 final boolean lockedPush(ForkJoinTask<?> task) { 865 ForkJoinTask<?>[] a; 866 boolean signal = false; 867 int s = top, b = base, cap, d; 868 if ((a = array) != null && (cap = a.length) > 0) { 869 a[(cap - 1) & s] = task; 870 top = s + 1; 871 if (b - s + cap - 1 == 0) 872 growArray(true); 873 else { 874 phase = 0; // full volatile unlock 875 if (((s - base) & ~1) == 0) // size 0 or 1 876 signal = true; 877 } 878 } 879 return signal; 880 } 881 882 /** 883 * Doubles the capacity of array. Call either by owner or with 884 * lock held -- it is OK for base, but not top, to move while 885 * resizings are in progress. 886 */ 887 final void growArray(boolean locked) { 888 ForkJoinTask<?>[] newA = null; 889 try { 890 ForkJoinTask<?>[] oldA; int oldSize, newSize; 891 if ((oldA = array) != null && (oldSize = oldA.length) > 0 && 892 (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY && 893 newSize > 0) { 894 try { 895 newA = new ForkJoinTask<?>[newSize]; 896 } catch (OutOfMemoryError ex) { 897 } 898 if (newA != null) { // poll from old array, push to new 899 int oldMask = oldSize - 1, newMask = newSize - 1; 900 for (int s = top - 1, k = oldMask; k >= 0; --k) { 901 ForkJoinTask<?> x = (ForkJoinTask<?>) 902 QA.getAndSet(oldA, s & oldMask, null); 903 if (x != null) 904 newA[s-- & newMask] = x; 905 else 906 break; 907 } 908 array = newA; 909 VarHandle.releaseFence(); 910 } 911 } 912 } finally { 913 if (locked) 914 phase = 0; 915 } 916 if (newA == null) 917 throw new RejectedExecutionException("Queue capacity exceeded"); 918 } 919 920 /** 921 * Takes next task, if one exists, in FIFO order. 922 */ 923 final ForkJoinTask<?> poll() { 924 int b, k, cap; ForkJoinTask<?>[] a; 925 while ((a = array) != null && (cap = a.length) > 0 && 926 top - (b = base) > 0) { 927 ForkJoinTask<?> t = (ForkJoinTask<?>) 928 QA.getAcquire(a, k = (cap - 1) & b); 929 if (base == b++) { 930 if (t == null) 931 Thread.yield(); // await index advance 932 else if (QA.compareAndSet(a, k, t, null)) { 933 BASE.setOpaque(this, b); 934 return t; 935 } 936 } 937 } 938 return null; 939 } 940 941 /** 942 * Takes next task, if one exists, in order specified by mode. 943 */ 944 final ForkJoinTask<?> nextLocalTask() { 945 ForkJoinTask<?> t = null; 946 int md = id, b, s, d, cap; ForkJoinTask<?>[] a; 947 if ((a = array) != null && (cap = a.length) > 0 && 948 (d = (s = top) - (b = base)) > 0) { 949 if ((md & FIFO) == 0 || d == 1) { 950 if ((t = (ForkJoinTask<?>) 951 QA.getAndSet(a, (cap - 1) & --s, null)) != null) 952 TOP.setOpaque(this, s); 953 } 954 else if ((t = (ForkJoinTask<?>) 955 QA.getAndSet(a, (cap - 1) & b++, null)) != null) { 956 BASE.setOpaque(this, b); 957 } 958 else // on contention in FIFO mode, use regular poll 959 t = poll(); 960 } 961 return t; 962 } 963 964 /** 965 * Returns next task, if one exists, in order specified by mode. 966 */ 967 final ForkJoinTask<?> peek() { 968 int cap; ForkJoinTask<?>[] a; 969 return ((a = array) != null && (cap = a.length) > 0) ? 970 a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null; 971 } 972 973 /** 974 * Pops the given task only if it is at the current top. 975 */ 976 final boolean tryUnpush(ForkJoinTask<?> task) { 977 boolean popped = false; 978 int s, cap; ForkJoinTask<?>[] a; 979 if ((a = array) != null && (cap = a.length) > 0 && 980 (s = top) != base && 981 (popped = QA.compareAndSet(a, (cap - 1) & --s, task, null))) 982 TOP.setOpaque(this, s); 983 return popped; 984 } 985 986 /** 987 * Shared version of tryUnpush. 988 */ 989 final boolean tryLockedUnpush(ForkJoinTask<?> task) { 990 boolean popped = false; 991 int s = top - 1, k, cap; ForkJoinTask<?>[] a; 992 if ((a = array) != null && (cap = a.length) > 0 && 993 a[k = (cap - 1) & s] == task && tryLockPhase()) { 994 if (top == s + 1 && array == a && 995 (popped = QA.compareAndSet(a, k, task, null))) 996 top = s; 997 releasePhaseLock(); 998 } 999 return popped; 1000 } 1001 1002 /** 1003 * Removes and cancels all known tasks, ignoring any exceptions. 1004 */ 1005 final void cancelAll() { 1006 for (ForkJoinTask<?> t; (t = poll()) != null; ) 1007 ForkJoinTask.cancelIgnoringExceptions(t); 1008 } 1009 1010 // Specialized execution methods 1011 1012 /** 1013 * Runs the given (stolen) task if nonnull, as well as 1014 * remaining local tasks and others available from the given 1015 * queue, up to bound n (to avoid infinite unfairness). 1016 */ 1017 final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) { 1018 if (t != null && q != null) { // hoist checks 1019 int nstolen = 1; 1020 for (;;) { 1021 t.doExec(); 1022 if (n-- < 0) 1023 break; 1024 else if ((t = nextLocalTask()) == null) { 1025 if ((t = q.poll()) == null) 1026 break; 1027 else 1028 ++nstolen; 1029 } 1030 } 1031 ForkJoinWorkerThread thread = owner; 1032 nsteals += nstolen; 1033 source = 0; 1034 if (thread != null) 1035 thread.afterTopLevelExec(); 1036 } 1037 } 1038 1039 /** 1040 * If present, removes task from queue and executes it. 1041 */ 1042 final void tryRemoveAndExec(ForkJoinTask<?> task) { 1043 ForkJoinTask<?>[] a; int s, cap; 1044 if ((a = array) != null && (cap = a.length) > 0 && 1045 (s = top) - base > 0) { // traverse from top 1046 for (int m = cap - 1, ns = s - 1, i = ns; ; --i) { 1047 int index = i & m; 1048 ForkJoinTask<?> t = (ForkJoinTask<?>)QA.get(a, index); 1049 if (t == null) 1050 break; 1051 else if (t == task) { 1052 if (QA.compareAndSet(a, index, t, null)) { 1053 top = ns; // safely shift down 1054 for (int j = i; j != ns; ++j) { 1055 ForkJoinTask<?> f; 1056 int pindex = (j + 1) & m; 1057 f = (ForkJoinTask<?>)QA.get(a, pindex); 1058 QA.setVolatile(a, pindex, null); 1059 int jindex = j & m; 1060 QA.setRelease(a, jindex, f); 1061 } 1062 VarHandle.releaseFence(); 1063 t.doExec(); 1064 } 1065 break; 1066 } 1067 } 1068 } 1069 } 1070 1071 /** 1072 * Tries to pop and run tasks within the target's computation 1073 * until done, not found, or limit exceeded. 1074 * 1075 * @param task root of CountedCompleter computation 1076 * @param limit max runs, or zero for no limit 1077 * @param shared true if must lock to extract task 1078 * @return task status on exit 1079 */ 1080 final int helpCC(CountedCompleter<?> task, int limit, boolean shared) { 1081 int status = 0; 1082 if (task != null && (status = task.status) >= 0) { 1083 int s, k, cap; ForkJoinTask<?>[] a; 1084 while ((a = array) != null && (cap = a.length) > 0 && 1085 (s = top) - base > 0) { 1086 CountedCompleter<?> v = null; 1087 ForkJoinTask<?> o = a[k = (cap - 1) & (s - 1)]; 1088 if (o instanceof CountedCompleter) { 1089 CountedCompleter<?> t = (CountedCompleter<?>)o; 1090 for (CountedCompleter<?> f = t;;) { 1091 if (f != task) { 1092 if ((f = f.completer) == null) 1093 break; 1094 } 1095 else if (shared) { 1096 if (tryLockPhase()) { 1097 if (top == s && array == a && 1098 QA.compareAndSet(a, k, t, null)) { 1099 top = s - 1; 1100 v = t; 1101 } 1102 releasePhaseLock(); 1103 } 1104 break; 1105 } 1106 else { 1107 if (QA.compareAndSet(a, k, t, null)) { 1108 top = s - 1; 1109 v = t; 1110 } 1111 break; 1112 } 1113 } 1114 } 1115 if (v != null) 1116 v.doExec(); 1117 if ((status = task.status) < 0 || v == null || 1118 (limit != 0 && --limit == 0)) 1119 break; 1120 } 1121 } 1122 return status; 1123 } 1124 1125 /** 1126 * Tries to poll and run AsynchronousCompletionTasks until 1127 * none found or blocker is released 1128 * 1129 * @param blocker the blocker 1130 */ 1131 final void helpAsyncBlocker(ManagedBlocker blocker) { 1132 if (blocker != null) { 1133 int b, k, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t; 1134 while ((a = array) != null && (cap = a.length) > 0 && 1135 top - (b = base) > 0) { 1136 t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b); 1137 if (blocker.isReleasable()) 1138 break; 1139 else if (base == b++ && t != null) { 1140 if (!(t instanceof CompletableFuture. 1141 AsynchronousCompletionTask)) 1142 break; 1143 else if (QA.compareAndSet(a, k, t, null)) { 1144 BASE.setOpaque(this, b); 1145 t.doExec(); 1146 } 1147 } 1148 } 1149 } 1150 } 1151 1152 /** 1153 * Returns true if owned and not known to be blocked. 1154 */ 1155 final boolean isApparentlyUnblocked() { 1156 Thread wt; Thread.State s; 1157 return ((wt = owner) != null && 1158 (s = wt.getState()) != Thread.State.BLOCKED && 1159 s != Thread.State.WAITING && 1160 s != Thread.State.TIMED_WAITING); 1161 } 1162 1163 // VarHandle mechanics. 1164 static final VarHandle PHASE; 1165 static final VarHandle BASE; 1166 static final VarHandle TOP; 1167 static { 1168 try { 1169 MethodHandles.Lookup l = MethodHandles.lookup(); 1170 PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class); 1171 BASE = l.findVarHandle(WorkQueue.class, "base", int.class); 1172 TOP = l.findVarHandle(WorkQueue.class, "top", int.class); 1173 } catch (ReflectiveOperationException e) { 1174 throw new ExceptionInInitializerError(e); 1175 } 1176 } 1177 } 1178 1179 // static fields (initialized in static initializer below) 1180 1181 /** 1182 * Creates a new ForkJoinWorkerThread. This factory is used unless 1183 * overridden in ForkJoinPool constructors. 1184 */ 1185 public static final ForkJoinWorkerThreadFactory 1186 defaultForkJoinWorkerThreadFactory; 1187 1188 /** 1189 * Permission required for callers of methods that may start or 1190 * kill threads. 1191 */ 1192 static final RuntimePermission modifyThreadPermission; 1193 1194 /** 1195 * Common (static) pool. Non-null for public use unless a static 1196 * construction exception, but internal usages null-check on use 1197 * to paranoically avoid potential initialization circularities 1198 * as well as to simplify generated code. 1199 */ 1200 static final ForkJoinPool common; 1201 1202 /** 1203 * Common pool parallelism. To allow simpler use and management 1204 * when common pool threads are disabled, we allow the underlying 1205 * common.parallelism field to be zero, but in that case still report 1206 * parallelism as 1 to reflect resulting caller-runs mechanics. 1207 */ 1208 static final int COMMON_PARALLELISM; 1209 1210 /** 1211 * Limit on spare thread construction in tryCompensate. 1212 */ 1213 private static final int COMMON_MAX_SPARES; 1214 1215 /** 1216 * Sequence number for creating workerNamePrefix. 1217 */ 1218 private static int poolNumberSequence; 1219 1220 /** 1221 * Returns the next sequence number. We don't expect this to 1222 * ever contend, so use simple builtin sync. 1223 */ 1224 private static final synchronized int nextPoolId() { 1225 return ++poolNumberSequence; 1226 } 1227 1228 // static configuration constants 1229 1230 /** 1231 * Default idle timeout value (in milliseconds) for the thread 1232 * triggering quiescence to park waiting for new work 1233 */ 1234 private static final long DEFAULT_KEEPALIVE = 60_000L; 1235 1236 /** 1237 * Undershoot tolerance for idle timeouts 1238 */ 1239 private static final long TIMEOUT_SLOP = 20L; 1240 1241 /** 1242 * The default value for COMMON_MAX_SPARES. Overridable using the 1243 * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system 1244 * property. The default value is far in excess of normal 1245 * requirements, but also far short of MAX_CAP and typical OS 1246 * thread limits, so allows JVMs to catch misuse/abuse before 1247 * running out of resources needed to do so. 1248 */ 1249 private static final int DEFAULT_COMMON_MAX_SPARES = 256; 1250 1251 /** 1252 * Increment for seed generators. See class ThreadLocal for 1253 * explanation. 1254 */ 1255 private static final int SEED_INCREMENT = 0x9e3779b9; 1256 1257 /* 1258 * Bits and masks for field ctl, packed with 4 16 bit subfields: 1259 * RC: Number of released (unqueued) workers minus target parallelism 1260 * TC: Number of total workers minus target parallelism 1261 * SS: version count and status of top waiting thread 1262 * ID: poolIndex of top of Treiber stack of waiters 1263 * 1264 * When convenient, we can extract the lower 32 stack top bits 1265 * (including version bits) as sp=(int)ctl. The offsets of counts 1266 * by the target parallelism and the positionings of fields makes 1267 * it possible to perform the most common checks via sign tests of 1268 * fields: When ac is negative, there are not enough unqueued 1269 * workers, when tc is negative, there are not enough total 1270 * workers. When sp is non-zero, there are waiting workers. To 1271 * deal with possibly negative fields, we use casts in and out of 1272 * "short" and/or signed shifts to maintain signedness. 1273 * 1274 * Because it occupies uppermost bits, we can add one release count 1275 * using getAndAddLong of RC_UNIT, rather than CAS, when returning 1276 * from a blocked join. Other updates entail multiple subfields 1277 * and masking, requiring CAS. 1278 * 1279 * The limits packed in field "bounds" are also offset by the 1280 * parallelism level to make them comparable to the ctl rc and tc 1281 * fields. 1282 */ 1283 1284 // Lower and upper word masks 1285 private static final long SP_MASK = 0xffffffffL; 1286 private static final long UC_MASK = ~SP_MASK; 1287 1288 // Release counts 1289 private static final int RC_SHIFT = 48; 1290 private static final long RC_UNIT = 0x0001L << RC_SHIFT; 1291 private static final long RC_MASK = 0xffffL << RC_SHIFT; 1292 1293 // Total counts 1294 private static final int TC_SHIFT = 32; 1295 private static final long TC_UNIT = 0x0001L << TC_SHIFT; 1296 private static final long TC_MASK = 0xffffL << TC_SHIFT; 1297 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign 1298 1299 // Instance fields 1300 1301 volatile long stealCount; // collects worker nsteals 1302 final long keepAlive; // milliseconds before dropping if idle 1303 int indexSeed; // next worker index 1304 final int bounds; // min, max threads packed as shorts 1305 volatile int mode; // parallelism, runstate, queue mode 1306 WorkQueue[] workQueues; // main registry 1307 final String workerNamePrefix; // for worker thread string; sync lock 1308 final ForkJoinWorkerThreadFactory factory; 1309 final UncaughtExceptionHandler ueh; // per-worker UEH 1310 final Predicate<? super ForkJoinPool> saturate; 1311 1312 @jdk.internal.vm.annotation.Contended("fjpctl") // segregate 1313 volatile long ctl; // main pool control 1314 1315 // Creating, registering and deregistering workers 1316 1317 /** 1318 * Tries to construct and start one worker. Assumes that total 1319 * count has already been incremented as a reservation. Invokes 1320 * deregisterWorker on any failure. 1321 * 1322 * @return true if successful 1323 */ 1324 private boolean createWorker() { 1325 ForkJoinWorkerThreadFactory fac = factory; 1326 Throwable ex = null; 1327 ForkJoinWorkerThread wt = null; 1328 try { 1329 if (fac != null && (wt = fac.newThread(this)) != null) { 1330 wt.start(); 1331 return true; 1332 } 1333 } catch (Throwable rex) { 1334 ex = rex; 1335 } 1336 deregisterWorker(wt, ex); 1337 return false; 1338 } 1339 1340 /** 1341 * Tries to add one worker, incrementing ctl counts before doing 1342 * so, relying on createWorker to back out on failure. 1343 * 1344 * @param c incoming ctl value, with total count negative and no 1345 * idle workers. On CAS failure, c is refreshed and retried if 1346 * this holds (otherwise, a new worker is not needed). 1347 */ 1348 private void tryAddWorker(long c) { 1349 do { 1350 long nc = ((RC_MASK & (c + RC_UNIT)) | 1351 (TC_MASK & (c + TC_UNIT))); 1352 if (ctl == c && CTL.compareAndSet(this, c, nc)) { 1353 createWorker(); 1354 break; 1355 } 1356 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); 1357 } 1358 1359 /** 1360 * Callback from ForkJoinWorkerThread constructor to establish and 1361 * record its WorkQueue. 1362 * 1363 * @param wt the worker thread 1364 * @return the worker's queue 1365 */ 1366 final WorkQueue registerWorker(ForkJoinWorkerThread wt) { 1367 UncaughtExceptionHandler handler; 1368 wt.setDaemon(true); // configure thread 1369 if ((handler = ueh) != null) 1370 wt.setUncaughtExceptionHandler(handler); 1371 int tid = 0; // for thread name 1372 int idbits = mode & FIFO; 1373 String prefix = workerNamePrefix; 1374 WorkQueue w = new WorkQueue(this, wt); 1375 if (prefix != null) { 1376 synchronized (prefix) { 1377 WorkQueue[] ws = workQueues; int n; 1378 int s = indexSeed += SEED_INCREMENT; 1379 idbits |= (s & ~(SMASK | FIFO | DORMANT)); 1380 if (ws != null && (n = ws.length) > 1) { 1381 int m = n - 1; 1382 tid = m & ((s << 1) | 1); // odd-numbered indices 1383 for (int probes = n >>> 1;;) { // find empty slot 1384 WorkQueue q; 1385 if ((q = ws[tid]) == null || q.phase == QUIET) 1386 break; 1387 else if (--probes == 0) { 1388 tid = n | 1; // resize below 1389 break; 1390 } 1391 else 1392 tid = (tid + 2) & m; 1393 } 1394 w.phase = w.id = tid | idbits; // now publishable 1395 1396 if (tid < n) 1397 ws[tid] = w; 1398 else { // expand array 1399 int an = n << 1; 1400 WorkQueue[] as = new WorkQueue[an]; 1401 as[tid] = w; 1402 int am = an - 1; 1403 for (int j = 0; j < n; ++j) { 1404 WorkQueue v; // copy external queue 1405 if ((v = ws[j]) != null) // position may change 1406 as[v.id & am & SQMASK] = v; 1407 if (++j >= n) 1408 break; 1409 as[j] = ws[j]; // copy worker 1410 } 1411 workQueues = as; 1412 } 1413 } 1414 } 1415 wt.setName(prefix.concat(Integer.toString(tid))); 1416 } 1417 return w; 1418 } 1419 1420 /** 1421 * Final callback from terminating worker, as well as upon failure 1422 * to construct or start a worker. Removes record of worker from 1423 * array, and adjusts counts. If pool is shutting down, tries to 1424 * complete termination. 1425 * 1426 * @param wt the worker thread, or null if construction failed 1427 * @param ex the exception causing failure, or null if none 1428 */ 1429 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { 1430 WorkQueue w = null; 1431 int phase = 0; 1432 if (wt != null && (w = wt.workQueue) != null) { 1433 Object lock = workerNamePrefix; 1434 int wid = w.id; 1435 long ns = (long)w.nsteals & 0xffffffffL; 1436 if (lock != null) { 1437 synchronized (lock) { 1438 WorkQueue[] ws; int n, i; // remove index from array 1439 if ((ws = workQueues) != null && (n = ws.length) > 0 && 1440 ws[i = wid & (n - 1)] == w) 1441 ws[i] = null; 1442 stealCount += ns; 1443 } 1444 } 1445 phase = w.phase; 1446 } 1447 if (phase != QUIET) { // else pre-adjusted 1448 long c; // decrement counts 1449 do {} while (!CTL.weakCompareAndSet 1450 (this, c = ctl, ((RC_MASK & (c - RC_UNIT)) | 1451 (TC_MASK & (c - TC_UNIT)) | 1452 (SP_MASK & c)))); 1453 } 1454 if (w != null) 1455 w.cancelAll(); // cancel remaining tasks 1456 1457 if (!tryTerminate(false, false) && // possibly replace worker 1458 w != null && w.array != null) // avoid repeated failures 1459 signalWork(); 1460 1461 if (ex == null) // help clean on way out 1462 ForkJoinTask.helpExpungeStaleExceptions(); 1463 else // rethrow 1464 ForkJoinTask.rethrow(ex); 1465 } 1466 1467 /** 1468 * Tries to create or release a worker if too few are running. 1469 */ 1470 final void signalWork() { 1471 for (;;) { 1472 long c; int sp; WorkQueue[] ws; int i; WorkQueue v; 1473 if ((c = ctl) >= 0L) // enough workers 1474 break; 1475 else if ((sp = (int)c) == 0) { // no idle workers 1476 if ((c & ADD_WORKER) != 0L) // too few workers 1477 tryAddWorker(c); 1478 break; 1479 } 1480 else if ((ws = workQueues) == null) 1481 break; // unstarted/terminated 1482 else if (ws.length <= (i = sp & SMASK)) 1483 break; // terminated 1484 else if ((v = ws[i]) == null) 1485 break; // terminating 1486 else { 1487 int np = sp & ~UNSIGNALLED; 1488 int vp = v.phase; 1489 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT)); 1490 Thread vt = v.owner; 1491 if (sp == vp && CTL.compareAndSet(this, c, nc)) { 1492 v.phase = np; 1493 if (vt != null && v.source < 0) 1494 LockSupport.unpark(vt); 1495 break; 1496 } 1497 } 1498 } 1499 } 1500 1501 /** 1502 * Tries to decrement counts (sometimes implicitly) and possibly 1503 * arrange for a compensating worker in preparation for blocking: 1504 * If not all core workers yet exist, creates one, else if any are 1505 * unreleased (possibly including caller) releases one, else if 1506 * fewer than the minimum allowed number of workers running, 1507 * checks to see that they are all active, and if so creates an 1508 * extra worker unless over maximum limit and policy is to 1509 * saturate. Most of these steps can fail due to interference, in 1510 * which case 0 is returned so caller will retry. A negative 1511 * return value indicates that the caller doesn't need to 1512 * re-adjust counts when later unblocked. 1513 * 1514 * @return 1: block then adjust, -1: block without adjust, 0 : retry 1515 */ 1516 private int tryCompensate(WorkQueue w) { 1517 int t, n, sp; 1518 long c = ctl; 1519 WorkQueue[] ws = workQueues; 1520 if ((t = (short)(c >>> TC_SHIFT)) >= 0) { 1521 if (ws == null || (n = ws.length) <= 0 || w == null) 1522 return 0; // disabled 1523 else if ((sp = (int)c) != 0) { // replace or release 1524 WorkQueue v = ws[sp & (n - 1)]; 1525 int wp = w.phase; 1526 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c); 1527 int np = sp & ~UNSIGNALLED; 1528 if (v != null) { 1529 int vp = v.phase; 1530 Thread vt = v.owner; 1531 long nc = ((long)v.stackPred & SP_MASK) | uc; 1532 if (vp == sp && CTL.compareAndSet(this, c, nc)) { 1533 v.phase = np; 1534 if (vt != null && v.source < 0) 1535 LockSupport.unpark(vt); 1536 return (wp < 0) ? -1 : 1; 1537 } 1538 } 1539 return 0; 1540 } 1541 else if ((int)(c >> RC_SHIFT) - // reduce parallelism 1542 (short)(bounds & SMASK) > 0) { 1543 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c)); 1544 return CTL.compareAndSet(this, c, nc) ? 1 : 0; 1545 } 1546 else { // validate 1547 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0; 1548 boolean unstable = false; 1549 for (int i = 1; i < n; i += 2) { 1550 WorkQueue q; Thread wt; Thread.State ts; 1551 if ((q = ws[i]) != null) { 1552 if (q.source == 0) { 1553 unstable = true; 1554 break; 1555 } 1556 else { 1557 --tc; 1558 if ((wt = q.owner) != null && 1559 ((ts = wt.getState()) == Thread.State.BLOCKED || 1560 ts == Thread.State.WAITING)) 1561 ++bc; // worker is blocking 1562 } 1563 } 1564 } 1565 if (unstable || tc != 0 || ctl != c) 1566 return 0; // inconsistent 1567 else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) { 1568 Predicate<? super ForkJoinPool> sat; 1569 if ((sat = saturate) != null && sat.test(this)) 1570 return -1; 1571 else if (bc < pc) { // lagging 1572 Thread.yield(); // for retry spins 1573 return 0; 1574 } 1575 else 1576 throw new RejectedExecutionException( 1577 "Thread limit exceeded replacing blocked worker"); 1578 } 1579 } 1580 } 1581 1582 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool 1583 return CTL.compareAndSet(this, c, nc) && createWorker() ? 1 : 0; 1584 } 1585 1586 /** 1587 * Top-level runloop for workers, called by ForkJoinWorkerThread.run. 1588 * See above for explanation. 1589 */ 1590 final void runWorker(WorkQueue w) { 1591 int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng 1592 w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize 1593 for (;;) { 1594 int phase; 1595 if (scan(w, r)) { // scan until apparently empty 1596 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift) 1597 } 1598 else if ((phase = w.phase) >= 0) { // enqueue, then rescan 1599 long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK; 1600 long c, nc; 1601 do { 1602 w.stackPred = (int)(c = ctl); 1603 nc = ((c - RC_UNIT) & UC_MASK) | np; 1604 } while (!CTL.weakCompareAndSet(this, c, nc)); 1605 } 1606 else { // already queued 1607 int pred = w.stackPred; 1608 Thread.interrupted(); // clear before park 1609 w.source = DORMANT; // enable signal 1610 long c = ctl; 1611 int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT); 1612 if (md < 0) // terminating 1613 break; 1614 else if (rc <= 0 && (md & SHUTDOWN) != 0 && 1615 tryTerminate(false, false)) 1616 break; // quiescent shutdown 1617 else if (rc <= 0 && pred != 0 && phase == (int)c) { 1618 long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred); 1619 long d = keepAlive + System.currentTimeMillis(); 1620 LockSupport.parkUntil(this, d); 1621 if (ctl == c && // drop on timeout if all idle 1622 d - System.currentTimeMillis() <= TIMEOUT_SLOP && 1623 CTL.compareAndSet(this, c, nc)) { 1624 w.phase = QUIET; 1625 break; 1626 } 1627 } 1628 else if (w.phase < 0) 1629 LockSupport.park(this); // OK if spuriously woken 1630 w.source = 0; // disable signal 1631 } 1632 } 1633 } 1634 1635 /** 1636 * Scans for and if found executes one or more top-level tasks from a queue. 1637 * 1638 * @return true if found an apparently non-empty queue, and 1639 * possibly ran task(s). 1640 */ 1641 private boolean scan(WorkQueue w, int r) { 1642 WorkQueue[] ws; int n; 1643 if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) { 1644 for (int m = n - 1, j = r & m;;) { 1645 WorkQueue q; int b; 1646 if ((q = ws[j]) != null && q.top != (b = q.base)) { 1647 int qid = q.id; 1648 ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t; 1649 if ((a = q.array) != null && (cap = a.length) > 0) { 1650 t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b); 1651 if (q.base == b++ && t != null && 1652 QA.compareAndSet(a, k, t, null)) { 1653 q.base = b; 1654 w.source = qid; 1655 if (q.top - b > 0) 1656 signalWork(); 1657 w.topLevelExec(t, q, // random fairness bound 1658 r & ((n << TOP_BOUND_SHIFT) - 1)); 1659 } 1660 } 1661 return true; 1662 } 1663 else if (--n > 0) 1664 j = (j + 1) & m; 1665 else 1666 break; 1667 } 1668 } 1669 return false; 1670 } 1671 1672 /** 1673 * Helps and/or blocks until the given task is done or timeout. 1674 * First tries locally helping, then scans other queues for a task 1675 * produced by one of w's stealers; compensating and blocking if 1676 * none are found (rescanning if tryCompensate fails). 1677 * 1678 * @param w caller 1679 * @param task the task 1680 * @param deadline for timed waits, if nonzero 1681 * @return task status on exit 1682 */ 1683 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) { 1684 int s = 0; 1685 int seed = ThreadLocalRandom.nextSecondarySeed(); 1686 if (w != null && task != null && 1687 (!(task instanceof CountedCompleter) || 1688 (s = w.helpCC((CountedCompleter<?>)task, 0, false)) >= 0)) { 1689 w.tryRemoveAndExec(task); 1690 int src = w.source, id = w.id; 1691 int r = (seed >>> 16) | 1, step = (seed & ~1) | 2; 1692 s = task.status; 1693 while (s >= 0) { 1694 WorkQueue[] ws; 1695 int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1; 1696 while (n > 0) { 1697 WorkQueue q; int b; 1698 if ((q = ws[r & m]) != null && q.source == id && 1699 q.top != (b = q.base)) { 1700 ForkJoinTask<?>[] a; int cap, k; 1701 int qid = q.id; 1702 if ((a = q.array) != null && (cap = a.length) > 0) { 1703 ForkJoinTask<?> t = (ForkJoinTask<?>) 1704 QA.getAcquire(a, k = (cap - 1) & b); 1705 if (q.source == id && q.base == b++ && 1706 t != null && QA.compareAndSet(a, k, t, null)) { 1707 q.base = b; 1708 w.source = qid; 1709 t.doExec(); 1710 w.source = src; 1711 } 1712 } 1713 break; 1714 } 1715 else { 1716 r += step; 1717 --n; 1718 } 1719 } 1720 if ((s = task.status) < 0) 1721 break; 1722 else if (n == 0) { // empty scan 1723 long ms, ns; int block; 1724 if (deadline == 0L) 1725 ms = 0L; // untimed 1726 else if ((ns = deadline - System.nanoTime()) <= 0L) 1727 break; // timeout 1728 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) 1729 ms = 1L; // avoid 0 for timed wait 1730 if ((block = tryCompensate(w)) != 0) { 1731 task.internalWait(ms); 1732 CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L); 1733 } 1734 s = task.status; 1735 } 1736 } 1737 } 1738 return s; 1739 } 1740 1741 /** 1742 * Runs tasks until {@code isQuiescent()}. Rather than blocking 1743 * when tasks cannot be found, rescans until all others cannot 1744 * find tasks either. 1745 */ 1746 final void helpQuiescePool(WorkQueue w) { 1747 int prevSrc = w.source; 1748 int seed = ThreadLocalRandom.nextSecondarySeed(); 1749 int r = seed >>> 16, step = r | 1; 1750 for (int source = prevSrc, released = -1;;) { // -1 until known 1751 ForkJoinTask<?> localTask; WorkQueue[] ws; 1752 while ((localTask = w.nextLocalTask()) != null) 1753 localTask.doExec(); 1754 if (w.phase >= 0 && released == -1) 1755 released = 1; 1756 boolean quiet = true, empty = true; 1757 int n = (ws = workQueues) == null ? 0 : ws.length; 1758 for (int m = n - 1; n > 0; r += step, --n) { 1759 WorkQueue q; int b; 1760 if ((q = ws[r & m]) != null) { 1761 int qs = q.source; 1762 if (q.top != (b = q.base)) { 1763 quiet = empty = false; 1764 ForkJoinTask<?>[] a; int cap, k; 1765 int qid = q.id; 1766 if ((a = q.array) != null && (cap = a.length) > 0) { 1767 if (released == 0) { // increment 1768 released = 1; 1769 CTL.getAndAdd(this, RC_UNIT); 1770 } 1771 ForkJoinTask<?> t = (ForkJoinTask<?>) 1772 QA.getAcquire(a, k = (cap - 1) & b); 1773 if (q.base == b++ && t != null && 1774 QA.compareAndSet(a, k, t, null)) { 1775 q.base = b; 1776 w.source = qid; 1777 t.doExec(); 1778 w.source = source = prevSrc; 1779 } 1780 } 1781 break; 1782 } 1783 else if ((qs & QUIET) == 0) 1784 quiet = false; 1785 } 1786 } 1787 if (quiet) { 1788 if (released == 0) 1789 CTL.getAndAdd(this, RC_UNIT); 1790 w.source = prevSrc; 1791 break; 1792 } 1793 else if (empty) { 1794 if (source != QUIET) 1795 w.source = source = QUIET; 1796 if (released == 1) { // decrement 1797 released = 0; 1798 CTL.getAndAdd(this, RC_MASK & -RC_UNIT); 1799 } 1800 } 1801 } 1802 } 1803 1804 /** 1805 * Scans for and returns a polled task, if available. 1806 * Used only for untracked polls. 1807 * 1808 * @param submissionsOnly if true, only scan submission queues 1809 */ 1810 private ForkJoinTask<?> pollScan(boolean submissionsOnly) { 1811 WorkQueue[] ws; int n; 1812 rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null && 1813 (n = ws.length) > 0) { 1814 int m = n - 1; 1815 int r = ThreadLocalRandom.nextSecondarySeed(); 1816 int h = r >>> 16; 1817 int origin, step; 1818 if (submissionsOnly) { 1819 origin = (r & ~1) & m; // even indices and steps 1820 step = (h & ~1) | 2; 1821 } 1822 else { 1823 origin = r & m; 1824 step = h | 1; 1825 } 1826 boolean nonempty = false; 1827 for (int i = origin, oldSum = 0, checkSum = 0;;) { 1828 WorkQueue q; 1829 if ((q = ws[i]) != null) { 1830 int b; ForkJoinTask<?> t; 1831 if (q.top - (b = q.base) > 0) { 1832 nonempty = true; 1833 if ((t = q.poll()) != null) 1834 return t; 1835 } 1836 else 1837 checkSum += b + q.id; 1838 } 1839 if ((i = (i + step) & m) == origin) { 1840 if (!nonempty && oldSum == (oldSum = checkSum)) 1841 break rescan; 1842 checkSum = 0; 1843 nonempty = false; 1844 } 1845 } 1846 } 1847 return null; 1848 } 1849 1850 /** 1851 * Gets and removes a local or stolen task for the given worker. 1852 * 1853 * @return a task, if available 1854 */ 1855 final ForkJoinTask<?> nextTaskFor(WorkQueue w) { 1856 ForkJoinTask<?> t; 1857 if (w == null || (t = w.nextLocalTask()) == null) 1858 t = pollScan(false); 1859 return t; 1860 } 1861 1862 // External operations 1863 1864 /** 1865 * Adds the given task to a submission queue at submitter's 1866 * current queue, creating one if null or contended. 1867 * 1868 * @param task the task. Caller must ensure non-null. 1869 */ 1870 final void externalPush(ForkJoinTask<?> task) { 1871 int r; // initialize caller's probe 1872 if ((r = ThreadLocalRandom.getProbe()) == 0) { 1873 ThreadLocalRandom.localInit(); 1874 r = ThreadLocalRandom.getProbe(); 1875 } 1876 for (;;) { 1877 WorkQueue q; 1878 int md = mode, n; 1879 WorkQueue[] ws = workQueues; 1880 if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0) 1881 throw new RejectedExecutionException(); 1882 else if ((q = ws[(n - 1) & r & SQMASK]) == null) { // add queue 1883 int qid = (r | QUIET) & ~(FIFO | OWNED); 1884 Object lock = workerNamePrefix; 1885 ForkJoinTask<?>[] qa = 1886 new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; 1887 q = new WorkQueue(this, null); 1888 q.array = qa; 1889 q.id = qid; 1890 q.source = QUIET; 1891 if (lock != null) { // unless disabled, lock pool to install 1892 synchronized (lock) { 1893 WorkQueue[] vs; int i, vn; 1894 if ((vs = workQueues) != null && (vn = vs.length) > 0 && 1895 vs[i = qid & (vn - 1) & SQMASK] == null) 1896 vs[i] = q; // else another thread already installed 1897 } 1898 } 1899 } 1900 else if (!q.tryLockPhase()) // move if busy 1901 r = ThreadLocalRandom.advanceProbe(r); 1902 else { 1903 if (q.lockedPush(task)) 1904 signalWork(); 1905 return; 1906 } 1907 } 1908 } 1909 1910 /** 1911 * Pushes a possibly-external submission. 1912 */ 1913 private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) { 1914 Thread t; ForkJoinWorkerThread w; WorkQueue q; 1915 if (task == null) 1916 throw new NullPointerException(); 1917 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && 1918 (w = (ForkJoinWorkerThread)t).pool == this && 1919 (q = w.workQueue) != null) 1920 q.push(task); 1921 else 1922 externalPush(task); 1923 return task; 1924 } 1925 1926 /** 1927 * Returns common pool queue for an external thread. 1928 */ 1929 static WorkQueue commonSubmitterQueue() { 1930 ForkJoinPool p = common; 1931 int r = ThreadLocalRandom.getProbe(); 1932 WorkQueue[] ws; int n; 1933 return (p != null && (ws = p.workQueues) != null && 1934 (n = ws.length) > 0) ? 1935 ws[(n - 1) & r & SQMASK] : null; 1936 } 1937 1938 /** 1939 * Performs tryUnpush for an external submitter. 1940 */ 1941 final boolean tryExternalUnpush(ForkJoinTask<?> task) { 1942 int r = ThreadLocalRandom.getProbe(); 1943 WorkQueue[] ws; WorkQueue w; int n; 1944 return ((ws = workQueues) != null && 1945 (n = ws.length) > 0 && 1946 (w = ws[(n - 1) & r & SQMASK]) != null && 1947 w.tryLockedUnpush(task)); 1948 } 1949 1950 /** 1951 * Performs helpComplete for an external submitter. 1952 */ 1953 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) { 1954 int r = ThreadLocalRandom.getProbe(); 1955 WorkQueue[] ws; WorkQueue w; int n; 1956 return ((ws = workQueues) != null && (n = ws.length) > 0 && 1957 (w = ws[(n - 1) & r & SQMASK]) != null) ? 1958 w.helpCC(task, maxTasks, true) : 0; 1959 } 1960 1961 /** 1962 * Tries to steal and run tasks within the target's computation. 1963 * The maxTasks argument supports external usages; internal calls 1964 * use zero, allowing unbounded steps (external calls trap 1965 * non-positive values). 1966 * 1967 * @param w caller 1968 * @param maxTasks if non-zero, the maximum number of other tasks to run 1969 * @return task status on exit 1970 */ 1971 final int helpComplete(WorkQueue w, CountedCompleter<?> task, 1972 int maxTasks) { 1973 return (w == null) ? 0 : w.helpCC(task, maxTasks, false); 1974 } 1975 1976 /** 1977 * Returns a cheap heuristic guide for task partitioning when 1978 * programmers, frameworks, tools, or languages have little or no 1979 * idea about task granularity. In essence, by offering this 1980 * method, we ask users only about tradeoffs in overhead vs 1981 * expected throughput and its variance, rather than how finely to 1982 * partition tasks. 1983 * 1984 * In a steady state strict (tree-structured) computation, each 1985 * thread makes available for stealing enough tasks for other 1986 * threads to remain active. Inductively, if all threads play by 1987 * the same rules, each thread should make available only a 1988 * constant number of tasks. 1989 * 1990 * The minimum useful constant is just 1. But using a value of 1 1991 * would require immediate replenishment upon each steal to 1992 * maintain enough tasks, which is infeasible. Further, 1993 * partitionings/granularities of offered tasks should minimize 1994 * steal rates, which in general means that threads nearer the top 1995 * of computation tree should generate more than those nearer the 1996 * bottom. In perfect steady state, each thread is at 1997 * approximately the same level of computation tree. However, 1998 * producing extra tasks amortizes the uncertainty of progress and 1999 * diffusion assumptions. 2000 * 2001 * So, users will want to use values larger (but not much larger) 2002 * than 1 to both smooth over transient shortages and hedge 2003 * against uneven progress; as traded off against the cost of 2004 * extra task overhead. We leave the user to pick a threshold 2005 * value to compare with the results of this call to guide 2006 * decisions, but recommend values such as 3. 2007 * 2008 * When all threads are active, it is on average OK to estimate 2009 * surplus strictly locally. In steady-state, if one thread is 2010 * maintaining say 2 surplus tasks, then so are others. So we can 2011 * just use estimated queue length. However, this strategy alone 2012 * leads to serious mis-estimates in some non-steady-state 2013 * conditions (ramp-up, ramp-down, other stalls). We can detect 2014 * many of these by further considering the number of "idle" 2015 * threads, that are known to have zero queued tasks, so 2016 * compensate by a factor of (#idle/#active) threads. 2017 */ 2018 static int getSurplusQueuedTaskCount() { 2019 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q; 2020 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && 2021 (pool = (wt = (ForkJoinWorkerThread)t).pool) != null && 2022 (q = wt.workQueue) != null) { 2023 int p = pool.mode & SMASK; 2024 int a = p + (int)(pool.ctl >> RC_SHIFT); 2025 int n = q.top - q.base; 2026 return n - (a > (p >>>= 1) ? 0 : 2027 a > (p >>>= 1) ? 1 : 2028 a > (p >>>= 1) ? 2 : 2029 a > (p >>>= 1) ? 4 : 2030 8); 2031 } 2032 return 0; 2033 } 2034 2035 // Termination 2036 2037 /** 2038 * Possibly initiates and/or completes termination. 2039 * 2040 * @param now if true, unconditionally terminate, else only 2041 * if no work and no active workers 2042 * @param enable if true, terminate when next possible 2043 * @return true if terminating or terminated 2044 */ 2045 private boolean tryTerminate(boolean now, boolean enable) { 2046 int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED 2047 2048 while (((md = mode) & SHUTDOWN) == 0) { 2049 if (!enable || this == common) // cannot shutdown 2050 return false; 2051 else 2052 MODE.compareAndSet(this, md, md | SHUTDOWN); 2053 } 2054 2055 while (((md = mode) & STOP) == 0) { // try to initiate termination 2056 if (!now) { // check if quiescent & empty 2057 for (long oldSum = 0L;;) { // repeat until stable 2058 boolean running = false; 2059 long checkSum = ctl; 2060 WorkQueue[] ws = workQueues; 2061 if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0) 2062 running = true; 2063 else if (ws != null) { 2064 WorkQueue w; 2065 for (int i = 0; i < ws.length; ++i) { 2066 if ((w = ws[i]) != null) { 2067 int s = w.source, p = w.phase; 2068 int d = w.id, b = w.base; 2069 if (b != w.top || 2070 ((d & 1) == 1 && (s >= 0 || p >= 0))) { 2071 running = true; 2072 break; // working, scanning, or have work 2073 } 2074 checkSum += (((long)s << 48) + ((long)p << 32) + 2075 ((long)b << 16) + (long)d); 2076 } 2077 } 2078 } 2079 if (((md = mode) & STOP) != 0) 2080 break; // already triggered 2081 else if (running) 2082 return false; 2083 else if (workQueues == ws && oldSum == (oldSum = checkSum)) 2084 break; 2085 } 2086 } 2087 if ((md & STOP) == 0) 2088 MODE.compareAndSet(this, md, md | STOP); 2089 } 2090 2091 while (((md = mode) & TERMINATED) == 0) { // help terminate others 2092 for (long oldSum = 0L;;) { // repeat until stable 2093 WorkQueue[] ws; WorkQueue w; 2094 long checkSum = ctl; 2095 if ((ws = workQueues) != null) { 2096 for (int i = 0; i < ws.length; ++i) { 2097 if ((w = ws[i]) != null) { 2098 ForkJoinWorkerThread wt = w.owner; 2099 w.cancelAll(); // clear queues 2100 if (wt != null) { 2101 try { // unblock join or park 2102 wt.interrupt(); 2103 } catch (Throwable ignore) { 2104 } 2105 } 2106 checkSum += ((long)w.phase << 32) + w.base; 2107 } 2108 } 2109 } 2110 if (((md = mode) & TERMINATED) != 0 || 2111 (workQueues == ws && oldSum == (oldSum = checkSum))) 2112 break; 2113 } 2114 if ((md & TERMINATED) != 0) 2115 break; 2116 else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0) 2117 break; 2118 else if (MODE.compareAndSet(this, md, md | TERMINATED)) { 2119 synchronized (this) { 2120 notifyAll(); // for awaitTermination 2121 } 2122 break; 2123 } 2124 } 2125 return true; 2126 } 2127 2128 // Exported methods 2129 2130 // Constructors 2131 2132 /** 2133 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 2134 * java.lang.Runtime#availableProcessors}, using defaults for all 2135 * other parameters (see {@link #ForkJoinPool(int, 2136 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean, 2137 * int, int, int, Predicate, long, TimeUnit)}). 2138 * 2139 * @throws SecurityException if a security manager exists and 2140 * the caller is not permitted to modify threads 2141 * because it does not hold {@link 2142 * java.lang.RuntimePermission}{@code ("modifyThread")} 2143 */ 2144 public ForkJoinPool() { 2145 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), 2146 defaultForkJoinWorkerThreadFactory, null, false, 2147 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2148 } 2149 2150 /** 2151 * Creates a {@code ForkJoinPool} with the indicated parallelism 2152 * level, using defaults for all other parameters (see {@link 2153 * #ForkJoinPool(int, ForkJoinWorkerThreadFactory, 2154 * UncaughtExceptionHandler, boolean, int, int, int, Predicate, 2155 * long, TimeUnit)}). 2156 * 2157 * @param parallelism the parallelism level 2158 * @throws IllegalArgumentException if parallelism less than or 2159 * equal to zero, or greater than implementation limit 2160 * @throws SecurityException if a security manager exists and 2161 * the caller is not permitted to modify threads 2162 * because it does not hold {@link 2163 * java.lang.RuntimePermission}{@code ("modifyThread")} 2164 */ 2165 public ForkJoinPool(int parallelism) { 2166 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false, 2167 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2168 } 2169 2170 /** 2171 * Creates a {@code ForkJoinPool} with the given parameters (using 2172 * defaults for others -- see {@link #ForkJoinPool(int, 2173 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean, 2174 * int, int, int, Predicate, long, TimeUnit)}). 2175 * 2176 * @param parallelism the parallelism level. For default value, 2177 * use {@link java.lang.Runtime#availableProcessors}. 2178 * @param factory the factory for creating new threads. For default value, 2179 * use {@link #defaultForkJoinWorkerThreadFactory}. 2180 * @param handler the handler for internal worker threads that 2181 * terminate due to unrecoverable errors encountered while executing 2182 * tasks. For default value, use {@code null}. 2183 * @param asyncMode if true, 2184 * establishes local first-in-first-out scheduling mode for forked 2185 * tasks that are never joined. This mode may be more appropriate 2186 * than default locally stack-based mode in applications in which 2187 * worker threads only process event-style asynchronous tasks. 2188 * For default value, use {@code false}. 2189 * @throws IllegalArgumentException if parallelism less than or 2190 * equal to zero, or greater than implementation limit 2191 * @throws NullPointerException if the factory is null 2192 * @throws SecurityException if a security manager exists and 2193 * the caller is not permitted to modify threads 2194 * because it does not hold {@link 2195 * java.lang.RuntimePermission}{@code ("modifyThread")} 2196 */ 2197 public ForkJoinPool(int parallelism, 2198 ForkJoinWorkerThreadFactory factory, 2199 UncaughtExceptionHandler handler, 2200 boolean asyncMode) { 2201 this(parallelism, factory, handler, asyncMode, 2202 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2203 } 2204 2205 /** 2206 * Creates a {@code ForkJoinPool} with the given parameters. 2207 * 2208 * @param parallelism the parallelism level. For default value, 2209 * use {@link java.lang.Runtime#availableProcessors}. 2210 * 2211 * @param factory the factory for creating new threads. For 2212 * default value, use {@link #defaultForkJoinWorkerThreadFactory}. 2213 * 2214 * @param handler the handler for internal worker threads that 2215 * terminate due to unrecoverable errors encountered while 2216 * executing tasks. For default value, use {@code null}. 2217 * 2218 * @param asyncMode if true, establishes local first-in-first-out 2219 * scheduling mode for forked tasks that are never joined. This 2220 * mode may be more appropriate than default locally stack-based 2221 * mode in applications in which worker threads only process 2222 * event-style asynchronous tasks. For default value, use {@code 2223 * false}. 2224 * 2225 * @param corePoolSize the number of threads to keep in the pool 2226 * (unless timed out after an elapsed keep-alive). Normally (and 2227 * by default) this is the same value as the parallelism level, 2228 * but may be set to a larger value to reduce dynamic overhead if 2229 * tasks regularly block. Using a smaller value (for example 2230 * {@code 0}) has the same effect as the default. 2231 * 2232 * @param maximumPoolSize the maximum number of threads allowed. 2233 * When the maximum is reached, attempts to replace blocked 2234 * threads fail. (However, because creation and termination of 2235 * different threads may overlap, and may be managed by the given 2236 * thread factory, this value may be transiently exceeded.) To 2237 * arrange the same value as is used by default for the common 2238 * pool, use {@code 256} plus the {@code parallelism} level. (By 2239 * default, the common pool allows a maximum of 256 spare 2240 * threads.) Using a value (for example {@code 2241 * Integer.MAX_VALUE}) larger than the implementation's total 2242 * thread limit has the same effect as using this limit (which is 2243 * the default). 2244 * 2245 * @param minimumRunnable the minimum allowed number of core 2246 * threads not blocked by a join or {@link ManagedBlocker}. To 2247 * ensure progress, when too few unblocked threads exist and 2248 * unexecuted tasks may exist, new threads are constructed, up to 2249 * the given maximumPoolSize. For the default value, use {@code 2250 * 1}, that ensures liveness. A larger value might improve 2251 * throughput in the presence of blocked activities, but might 2252 * not, due to increased overhead. A value of zero may be 2253 * acceptable when submitted tasks cannot have dependencies 2254 * requiring additional threads. 2255 * 2256 * @param saturate if non-null, a predicate invoked upon attempts 2257 * to create more than the maximum total allowed threads. By 2258 * default, when a thread is about to block on a join or {@link 2259 * ManagedBlocker}, but cannot be replaced because the 2260 * maximumPoolSize would be exceeded, a {@link 2261 * RejectedExecutionException} is thrown. But if this predicate 2262 * returns {@code true}, then no exception is thrown, so the pool 2263 * continues to operate with fewer than the target number of 2264 * runnable threads, which might not ensure progress. 2265 * 2266 * @param keepAliveTime the elapsed time since last use before 2267 * a thread is terminated (and then later replaced if needed). 2268 * For the default value, use {@code 60, TimeUnit.SECONDS}. 2269 * 2270 * @param unit the time unit for the {@code keepAliveTime} argument 2271 * 2272 * @throws IllegalArgumentException if parallelism is less than or 2273 * equal to zero, or is greater than implementation limit, 2274 * or if maximumPoolSize is less than parallelism, 2275 * of if the keepAliveTime is less than or equal to zero. 2276 * @throws NullPointerException if the factory is null 2277 * @throws SecurityException if a security manager exists and 2278 * the caller is not permitted to modify threads 2279 * because it does not hold {@link 2280 * java.lang.RuntimePermission}{@code ("modifyThread")} 2281 * @since 9 2282 */ 2283 public ForkJoinPool(int parallelism, 2284 ForkJoinWorkerThreadFactory factory, 2285 UncaughtExceptionHandler handler, 2286 boolean asyncMode, 2287 int corePoolSize, 2288 int maximumPoolSize, 2289 int minimumRunnable, 2290 Predicate<? super ForkJoinPool> saturate, 2291 long keepAliveTime, 2292 TimeUnit unit) { 2293 // check, encode, pack parameters 2294 if (parallelism <= 0 || parallelism > MAX_CAP || 2295 maximumPoolSize < parallelism || keepAliveTime <= 0L) 2296 throw new IllegalArgumentException(); 2297 if (factory == null) 2298 throw new NullPointerException(); 2299 long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP); 2300 2301 int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP); 2302 long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) | 2303 (((long)(-parallelism) << RC_SHIFT) & RC_MASK)); 2304 int m = parallelism | (asyncMode ? FIFO : 0); 2305 int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism; 2306 int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP); 2307 int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH); 2308 int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots 2309 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; 2310 n = (n + 1) << 1; // power of two, including space for submission queues 2311 2312 this.workerNamePrefix = "ForkJoinPool-" + nextPoolId() + "-worker-"; 2313 this.workQueues = new WorkQueue[n]; 2314 this.factory = factory; 2315 this.ueh = handler; 2316 this.saturate = saturate; 2317 this.keepAlive = ms; 2318 this.bounds = b; 2319 this.mode = m; 2320 this.ctl = c; 2321 checkPermission(); 2322 } 2323 2324 private static Object newInstanceFromSystemProperty(String property) 2325 throws ReflectiveOperationException { 2326 String className = System.getProperty(property); 2327 return (className == null) 2328 ? null 2329 : ClassLoader.getSystemClassLoader().loadClass(className) 2330 .getConstructor().newInstance(); 2331 } 2332 2333 /** 2334 * Constructor for common pool using parameters possibly 2335 * overridden by system properties 2336 */ 2337 private ForkJoinPool(byte forCommonPoolOnly) { 2338 int parallelism = -1; 2339 ForkJoinWorkerThreadFactory fac = null; 2340 UncaughtExceptionHandler handler = null; 2341 try { // ignore exceptions in accessing/parsing properties 2342 String pp = System.getProperty 2343 ("java.util.concurrent.ForkJoinPool.common.parallelism"); 2344 if (pp != null) 2345 parallelism = Integer.parseInt(pp); 2346 fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty( 2347 "java.util.concurrent.ForkJoinPool.common.threadFactory"); 2348 handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty( 2349 "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); 2350 } catch (Exception ignore) { 2351 } 2352 2353 if (fac == null) { 2354 if (System.getSecurityManager() == null) 2355 fac = defaultForkJoinWorkerThreadFactory; 2356 else // use security-managed default 2357 fac = new InnocuousForkJoinWorkerThreadFactory(); 2358 } 2359 if (parallelism < 0 && // default 1 less than #cores 2360 (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) 2361 parallelism = 1; 2362 if (parallelism > MAX_CAP) 2363 parallelism = MAX_CAP; 2364 2365 long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) | 2366 (((long)(-parallelism) << RC_SHIFT) & RC_MASK)); 2367 int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH); 2368 int n = (parallelism > 1) ? parallelism - 1 : 1; 2369 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; 2370 n = (n + 1) << 1; 2371 2372 this.workerNamePrefix = "ForkJoinPool.commonPool-worker-"; 2373 this.workQueues = new WorkQueue[n]; 2374 this.factory = fac; 2375 this.ueh = handler; 2376 this.saturate = null; 2377 this.keepAlive = DEFAULT_KEEPALIVE; 2378 this.bounds = b; 2379 this.mode = parallelism; 2380 this.ctl = c; 2381 } 2382 2383 /** 2384 * Returns the common pool instance. This pool is statically 2385 * constructed; its run state is unaffected by attempts to {@link 2386 * #shutdown} or {@link #shutdownNow}. However this pool and any 2387 * ongoing processing are automatically terminated upon program 2388 * {@link System#exit}. Any program that relies on asynchronous 2389 * task processing to complete before program termination should 2390 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence}, 2391 * before exit. 2392 * 2393 * @return the common pool instance 2394 * @since 1.8 2395 */ 2396 public static ForkJoinPool commonPool() { 2397 // assert common != null : "static init error"; 2398 return common; 2399 } 2400 2401 // Execution methods 2402 2403 /** 2404 * Performs the given task, returning its result upon completion. 2405 * If the computation encounters an unchecked Exception or Error, 2406 * it is rethrown as the outcome of this invocation. Rethrown 2407 * exceptions behave in the same way as regular exceptions, but, 2408 * when possible, contain stack traces (as displayed for example 2409 * using {@code ex.printStackTrace()}) of both the current thread 2410 * as well as the thread actually encountering the exception; 2411 * minimally only the latter. 2412 * 2413 * @param task the task 2414 * @param <T> the type of the task's result 2415 * @return the task's result 2416 * @throws NullPointerException if the task is null 2417 * @throws RejectedExecutionException if the task cannot be 2418 * scheduled for execution 2419 */ 2420 public <T> T invoke(ForkJoinTask<T> task) { 2421 if (task == null) 2422 throw new NullPointerException(); 2423 externalSubmit(task); 2424 return task.join(); 2425 } 2426 2427 /** 2428 * Arranges for (asynchronous) execution of the given task. 2429 * 2430 * @param task the task 2431 * @throws NullPointerException if the task is null 2432 * @throws RejectedExecutionException if the task cannot be 2433 * scheduled for execution 2434 */ 2435 public void execute(ForkJoinTask<?> task) { 2436 externalSubmit(task); 2437 } 2438 2439 // AbstractExecutorService methods 2440 2441 /** 2442 * @throws NullPointerException if the task is null 2443 * @throws RejectedExecutionException if the task cannot be 2444 * scheduled for execution 2445 */ 2446 public void execute(Runnable task) { 2447 if (task == null) 2448 throw new NullPointerException(); 2449 ForkJoinTask<?> job; 2450 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 2451 job = (ForkJoinTask<?>) task; 2452 else 2453 job = new ForkJoinTask.RunnableExecuteAction(task); 2454 externalSubmit(job); 2455 } 2456 2457 /** 2458 * Submits a ForkJoinTask for execution. 2459 * 2460 * @param task the task to submit 2461 * @param <T> the type of the task's result 2462 * @return the task 2463 * @throws NullPointerException if the task is null 2464 * @throws RejectedExecutionException if the task cannot be 2465 * scheduled for execution 2466 */ 2467 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 2468 return externalSubmit(task); 2469 } 2470 2471 /** 2472 * @throws NullPointerException if the task is null 2473 * @throws RejectedExecutionException if the task cannot be 2474 * scheduled for execution 2475 */ 2476 public <T> ForkJoinTask<T> submit(Callable<T> task) { 2477 return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task)); 2478 } 2479 2480 /** 2481 * @throws NullPointerException if the task is null 2482 * @throws RejectedExecutionException if the task cannot be 2483 * scheduled for execution 2484 */ 2485 public <T> ForkJoinTask<T> submit(Runnable task, T result) { 2486 return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result)); 2487 } 2488 2489 /** 2490 * @throws NullPointerException if the task is null 2491 * @throws RejectedExecutionException if the task cannot be 2492 * scheduled for execution 2493 */ 2494 @SuppressWarnings("unchecked") 2495 public ForkJoinTask<?> submit(Runnable task) { 2496 if (task == null) 2497 throw new NullPointerException(); 2498 return externalSubmit((task instanceof ForkJoinTask<?>) 2499 ? (ForkJoinTask<Void>) task // avoid re-wrap 2500 : new ForkJoinTask.AdaptedRunnableAction(task)); 2501 } 2502 2503 /** 2504 * @throws NullPointerException {@inheritDoc} 2505 * @throws RejectedExecutionException {@inheritDoc} 2506 */ 2507 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { 2508 // In previous versions of this class, this method constructed 2509 // a task to run ForkJoinTask.invokeAll, but now external 2510 // invocation of multiple tasks is at least as efficient. 2511 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); 2512 2513 try { 2514 for (Callable<T> t : tasks) { 2515 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t); 2516 futures.add(f); 2517 externalSubmit(f); 2518 } 2519 for (int i = 0, size = futures.size(); i < size; i++) 2520 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin(); 2521 return futures; 2522 } catch (Throwable t) { 2523 for (int i = 0, size = futures.size(); i < size; i++) 2524 futures.get(i).cancel(false); 2525 throw t; 2526 } 2527 } 2528 2529 /** 2530 * Returns the factory used for constructing new workers. 2531 * 2532 * @return the factory used for constructing new workers 2533 */ 2534 public ForkJoinWorkerThreadFactory getFactory() { 2535 return factory; 2536 } 2537 2538 /** 2539 * Returns the handler for internal worker threads that terminate 2540 * due to unrecoverable errors encountered while executing tasks. 2541 * 2542 * @return the handler, or {@code null} if none 2543 */ 2544 public UncaughtExceptionHandler getUncaughtExceptionHandler() { 2545 return ueh; 2546 } 2547 2548 /** 2549 * Returns the targeted parallelism level of this pool. 2550 * 2551 * @return the targeted parallelism level of this pool 2552 */ 2553 public int getParallelism() { 2554 int par = mode & SMASK; 2555 return (par > 0) ? par : 1; 2556 } 2557 2558 /** 2559 * Returns the targeted parallelism level of the common pool. 2560 * 2561 * @return the targeted parallelism level of the common pool 2562 * @since 1.8 2563 */ 2564 public static int getCommonPoolParallelism() { 2565 return COMMON_PARALLELISM; 2566 } 2567 2568 /** 2569 * Returns the number of worker threads that have started but not 2570 * yet terminated. The result returned by this method may differ 2571 * from {@link #getParallelism} when threads are created to 2572 * maintain parallelism when others are cooperatively blocked. 2573 * 2574 * @return the number of worker threads 2575 */ 2576 public int getPoolSize() { 2577 return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT)); 2578 } 2579 2580 /** 2581 * Returns {@code true} if this pool uses local first-in-first-out 2582 * scheduling mode for forked tasks that are never joined. 2583 * 2584 * @return {@code true} if this pool uses async mode 2585 */ 2586 public boolean getAsyncMode() { 2587 return (mode & FIFO) != 0; 2588 } 2589 2590 /** 2591 * Returns an estimate of the number of worker threads that are 2592 * not blocked waiting to join tasks or for other managed 2593 * synchronization. This method may overestimate the 2594 * number of running threads. 2595 * 2596 * @return the number of worker threads 2597 */ 2598 public int getRunningThreadCount() { 2599 WorkQueue[] ws; WorkQueue w; 2600 VarHandle.acquireFence(); 2601 int rc = 0; 2602 if ((ws = workQueues) != null) { 2603 for (int i = 1; i < ws.length; i += 2) { 2604 if ((w = ws[i]) != null && w.isApparentlyUnblocked()) 2605 ++rc; 2606 } 2607 } 2608 return rc; 2609 } 2610 2611 /** 2612 * Returns an estimate of the number of threads that are currently 2613 * stealing or executing tasks. This method may overestimate the 2614 * number of active threads. 2615 * 2616 * @return the number of active threads 2617 */ 2618 public int getActiveThreadCount() { 2619 int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT); 2620 return (r <= 0) ? 0 : r; // suppress momentarily negative values 2621 } 2622 2623 /** 2624 * Returns {@code true} if all worker threads are currently idle. 2625 * An idle worker is one that cannot obtain a task to execute 2626 * because none are available to steal from other threads, and 2627 * there are no pending submissions to the pool. This method is 2628 * conservative; it might not return {@code true} immediately upon 2629 * idleness of all threads, but will eventually become true if 2630 * threads remain inactive. 2631 * 2632 * @return {@code true} if all threads are currently idle 2633 */ 2634 public boolean isQuiescent() { 2635 for (;;) { 2636 long c = ctl; 2637 int md = mode, pc = md & SMASK; 2638 int tc = pc + (short)(c >>> TC_SHIFT); 2639 int rc = pc + (int)(c >> RC_SHIFT); 2640 if ((md & (STOP | TERMINATED)) != 0) 2641 return true; 2642 else if (rc > 0) 2643 return false; 2644 else { 2645 WorkQueue[] ws; WorkQueue v; 2646 if ((ws = workQueues) != null) { 2647 for (int i = 1; i < ws.length; i += 2) { 2648 if ((v = ws[i]) != null) { 2649 if (v.source > 0) 2650 return false; 2651 --tc; 2652 } 2653 } 2654 } 2655 if (tc == 0 && ctl == c) 2656 return true; 2657 } 2658 } 2659 } 2660 2661 /** 2662 * Returns an estimate of the total number of tasks stolen from 2663 * one thread's work queue by another. The reported value 2664 * underestimates the actual total number of steals when the pool 2665 * is not quiescent. This value may be useful for monitoring and 2666 * tuning fork/join programs: in general, steal counts should be 2667 * high enough to keep threads busy, but low enough to avoid 2668 * overhead and contention across threads. 2669 * 2670 * @return the number of steals 2671 */ 2672 public long getStealCount() { 2673 long count = stealCount; 2674 WorkQueue[] ws; WorkQueue w; 2675 if ((ws = workQueues) != null) { 2676 for (int i = 1; i < ws.length; i += 2) { 2677 if ((w = ws[i]) != null) 2678 count += (long)w.nsteals & 0xffffffffL; 2679 } 2680 } 2681 return count; 2682 } 2683 2684 /** 2685 * Returns an estimate of the total number of tasks currently held 2686 * in queues by worker threads (but not including tasks submitted 2687 * to the pool that have not begun executing). This value is only 2688 * an approximation, obtained by iterating across all threads in 2689 * the pool. This method may be useful for tuning task 2690 * granularities. 2691 * 2692 * @return the number of queued tasks 2693 */ 2694 public long getQueuedTaskCount() { 2695 WorkQueue[] ws; WorkQueue w; 2696 VarHandle.acquireFence(); 2697 int count = 0; 2698 if ((ws = workQueues) != null) { 2699 for (int i = 1; i < ws.length; i += 2) { 2700 if ((w = ws[i]) != null) 2701 count += w.queueSize(); 2702 } 2703 } 2704 return count; 2705 } 2706 2707 /** 2708 * Returns an estimate of the number of tasks submitted to this 2709 * pool that have not yet begun executing. This method may take 2710 * time proportional to the number of submissions. 2711 * 2712 * @return the number of queued submissions 2713 */ 2714 public int getQueuedSubmissionCount() { 2715 WorkQueue[] ws; WorkQueue w; 2716 VarHandle.acquireFence(); 2717 int count = 0; 2718 if ((ws = workQueues) != null) { 2719 for (int i = 0; i < ws.length; i += 2) { 2720 if ((w = ws[i]) != null) 2721 count += w.queueSize(); 2722 } 2723 } 2724 return count; 2725 } 2726 2727 /** 2728 * Returns {@code true} if there are any tasks submitted to this 2729 * pool that have not yet begun executing. 2730 * 2731 * @return {@code true} if there are any queued submissions 2732 */ 2733 public boolean hasQueuedSubmissions() { 2734 WorkQueue[] ws; WorkQueue w; 2735 VarHandle.acquireFence(); 2736 if ((ws = workQueues) != null) { 2737 for (int i = 0; i < ws.length; i += 2) { 2738 if ((w = ws[i]) != null && !w.isEmpty()) 2739 return true; 2740 } 2741 } 2742 return false; 2743 } 2744 2745 /** 2746 * Removes and returns the next unexecuted submission if one is 2747 * available. This method may be useful in extensions to this 2748 * class that re-assign work in systems with multiple pools. 2749 * 2750 * @return the next submission, or {@code null} if none 2751 */ 2752 protected ForkJoinTask<?> pollSubmission() { 2753 return pollScan(true); 2754 } 2755 2756 /** 2757 * Removes all available unexecuted submitted and forked tasks 2758 * from scheduling queues and adds them to the given collection, 2759 * without altering their execution status. These may include 2760 * artificially generated or wrapped tasks. This method is 2761 * designed to be invoked only when the pool is known to be 2762 * quiescent. Invocations at other times may not remove all 2763 * tasks. A failure encountered while attempting to add elements 2764 * to collection {@code c} may result in elements being in 2765 * neither, either or both collections when the associated 2766 * exception is thrown. The behavior of this operation is 2767 * undefined if the specified collection is modified while the 2768 * operation is in progress. 2769 * 2770 * @param c the collection to transfer elements into 2771 * @return the number of elements transferred 2772 */ 2773 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 2774 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; 2775 VarHandle.acquireFence(); 2776 int count = 0; 2777 if ((ws = workQueues) != null) { 2778 for (int i = 0; i < ws.length; ++i) { 2779 if ((w = ws[i]) != null) { 2780 while ((t = w.poll()) != null) { 2781 c.add(t); 2782 ++count; 2783 } 2784 } 2785 } 2786 } 2787 return count; 2788 } 2789 2790 /** 2791 * Returns a string identifying this pool, as well as its state, 2792 * including indications of run state, parallelism level, and 2793 * worker and task counts. 2794 * 2795 * @return a string identifying this pool, as well as its state 2796 */ 2797 public String toString() { 2798 // Use a single pass through workQueues to collect counts 2799 int md = mode; // read volatile fields first 2800 long c = ctl; 2801 long st = stealCount; 2802 long qt = 0L, qs = 0L; int rc = 0; 2803 WorkQueue[] ws; WorkQueue w; 2804 if ((ws = workQueues) != null) { 2805 for (int i = 0; i < ws.length; ++i) { 2806 if ((w = ws[i]) != null) { 2807 int size = w.queueSize(); 2808 if ((i & 1) == 0) 2809 qs += size; 2810 else { 2811 qt += size; 2812 st += (long)w.nsteals & 0xffffffffL; 2813 if (w.isApparentlyUnblocked()) 2814 ++rc; 2815 } 2816 } 2817 } 2818 } 2819 2820 int pc = (md & SMASK); 2821 int tc = pc + (short)(c >>> TC_SHIFT); 2822 int ac = pc + (int)(c >> RC_SHIFT); 2823 if (ac < 0) // ignore transient negative 2824 ac = 0; 2825 String level = ((md & TERMINATED) != 0 ? "Terminated" : 2826 (md & STOP) != 0 ? "Terminating" : 2827 (md & SHUTDOWN) != 0 ? "Shutting down" : 2828 "Running"); 2829 return super.toString() + 2830 "[" + level + 2831 ", parallelism = " + pc + 2832 ", size = " + tc + 2833 ", active = " + ac + 2834 ", running = " + rc + 2835 ", steals = " + st + 2836 ", tasks = " + qt + 2837 ", submissions = " + qs + 2838 "]"; 2839 } 2840 2841 /** 2842 * Possibly initiates an orderly shutdown in which previously 2843 * submitted tasks are executed, but no new tasks will be 2844 * accepted. Invocation has no effect on execution state if this 2845 * is the {@link #commonPool()}, and no additional effect if 2846 * already shut down. Tasks that are in the process of being 2847 * submitted concurrently during the course of this method may or 2848 * may not be rejected. 2849 * 2850 * @throws SecurityException if a security manager exists and 2851 * the caller is not permitted to modify threads 2852 * because it does not hold {@link 2853 * java.lang.RuntimePermission}{@code ("modifyThread")} 2854 */ 2855 public void shutdown() { 2856 checkPermission(); 2857 tryTerminate(false, true); 2858 } 2859 2860 /** 2861 * Possibly attempts to cancel and/or stop all tasks, and reject 2862 * all subsequently submitted tasks. Invocation has no effect on 2863 * execution state if this is the {@link #commonPool()}, and no 2864 * additional effect if already shut down. Otherwise, tasks that 2865 * are in the process of being submitted or executed concurrently 2866 * during the course of this method may or may not be 2867 * rejected. This method cancels both existing and unexecuted 2868 * tasks, in order to permit termination in the presence of task 2869 * dependencies. So the method always returns an empty list 2870 * (unlike the case for some other Executors). 2871 * 2872 * @return an empty list 2873 * @throws SecurityException if a security manager exists and 2874 * the caller is not permitted to modify threads 2875 * because it does not hold {@link 2876 * java.lang.RuntimePermission}{@code ("modifyThread")} 2877 */ 2878 public List<Runnable> shutdownNow() { 2879 checkPermission(); 2880 tryTerminate(true, true); 2881 return Collections.emptyList(); 2882 } 2883 2884 /** 2885 * Returns {@code true} if all tasks have completed following shut down. 2886 * 2887 * @return {@code true} if all tasks have completed following shut down 2888 */ 2889 public boolean isTerminated() { 2890 return (mode & TERMINATED) != 0; 2891 } 2892 2893 /** 2894 * Returns {@code true} if the process of termination has 2895 * commenced but not yet completed. This method may be useful for 2896 * debugging. A return of {@code true} reported a sufficient 2897 * period after shutdown may indicate that submitted tasks have 2898 * ignored or suppressed interruption, or are waiting for I/O, 2899 * causing this executor not to properly terminate. (See the 2900 * advisory notes for class {@link ForkJoinTask} stating that 2901 * tasks should not normally entail blocking operations. But if 2902 * they do, they must abort them on interrupt.) 2903 * 2904 * @return {@code true} if terminating but not yet terminated 2905 */ 2906 public boolean isTerminating() { 2907 int md = mode; 2908 return (md & STOP) != 0 && (md & TERMINATED) == 0; 2909 } 2910 2911 /** 2912 * Returns {@code true} if this pool has been shut down. 2913 * 2914 * @return {@code true} if this pool has been shut down 2915 */ 2916 public boolean isShutdown() { 2917 return (mode & SHUTDOWN) != 0; 2918 } 2919 2920 /** 2921 * Blocks until all tasks have completed execution after a 2922 * shutdown request, or the timeout occurs, or the current thread 2923 * is interrupted, whichever happens first. Because the {@link 2924 * #commonPool()} never terminates until program shutdown, when 2925 * applied to the common pool, this method is equivalent to {@link 2926 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}. 2927 * 2928 * @param timeout the maximum time to wait 2929 * @param unit the time unit of the timeout argument 2930 * @return {@code true} if this executor terminated and 2931 * {@code false} if the timeout elapsed before termination 2932 * @throws InterruptedException if interrupted while waiting 2933 */ 2934 public boolean awaitTermination(long timeout, TimeUnit unit) 2935 throws InterruptedException { 2936 if (Thread.interrupted()) 2937 throw new InterruptedException(); 2938 if (this == common) { 2939 awaitQuiescence(timeout, unit); 2940 return false; 2941 } 2942 long nanos = unit.toNanos(timeout); 2943 if (isTerminated()) 2944 return true; 2945 if (nanos <= 0L) 2946 return false; 2947 long deadline = System.nanoTime() + nanos; 2948 synchronized (this) { 2949 for (;;) { 2950 if (isTerminated()) 2951 return true; 2952 if (nanos <= 0L) 2953 return false; 2954 long millis = TimeUnit.NANOSECONDS.toMillis(nanos); 2955 wait(millis > 0L ? millis : 1L); 2956 nanos = deadline - System.nanoTime(); 2957 } 2958 } 2959 } 2960 2961 /** 2962 * If called by a ForkJoinTask operating in this pool, equivalent 2963 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, 2964 * waits and/or attempts to assist performing tasks until this 2965 * pool {@link #isQuiescent} or the indicated timeout elapses. 2966 * 2967 * @param timeout the maximum time to wait 2968 * @param unit the time unit of the timeout argument 2969 * @return {@code true} if quiescent; {@code false} if the 2970 * timeout elapsed. 2971 */ 2972 public boolean awaitQuiescence(long timeout, TimeUnit unit) { 2973 long nanos = unit.toNanos(timeout); 2974 ForkJoinWorkerThread wt; 2975 Thread thread = Thread.currentThread(); 2976 if ((thread instanceof ForkJoinWorkerThread) && 2977 (wt = (ForkJoinWorkerThread)thread).pool == this) { 2978 helpQuiescePool(wt.workQueue); 2979 return true; 2980 } 2981 else { 2982 for (long startTime = System.nanoTime();;) { 2983 ForkJoinTask<?> t; 2984 if ((t = pollScan(false)) != null) 2985 t.doExec(); 2986 else if (isQuiescent()) 2987 return true; 2988 else if ((System.nanoTime() - startTime) > nanos) 2989 return false; 2990 else 2991 Thread.yield(); // cannot block 2992 } 2993 } 2994 } 2995 2996 /** 2997 * Waits and/or attempts to assist performing tasks indefinitely 2998 * until the {@link #commonPool()} {@link #isQuiescent}. 2999 */ 3000 static void quiesceCommonPool() { 3001 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 3002 } 3003 3004 /** 3005 * Interface for extending managed parallelism for tasks running 3006 * in {@link ForkJoinPool}s. 3007 * 3008 * <p>A {@code ManagedBlocker} provides two methods. Method 3009 * {@link #isReleasable} must return {@code true} if blocking is 3010 * not necessary. Method {@link #block} blocks the current thread 3011 * if necessary (perhaps internally invoking {@code isReleasable} 3012 * before actually blocking). These actions are performed by any 3013 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}. 3014 * The unusual methods in this API accommodate synchronizers that 3015 * may, but don't usually, block for long periods. Similarly, they 3016 * allow more efficient internal handling of cases in which 3017 * additional workers may be, but usually are not, needed to 3018 * ensure sufficient parallelism. Toward this end, 3019 * implementations of method {@code isReleasable} must be amenable 3020 * to repeated invocation. 3021 * 3022 * <p>For example, here is a ManagedBlocker based on a 3023 * ReentrantLock: 3024 * <pre> {@code 3025 * class ManagedLocker implements ManagedBlocker { 3026 * final ReentrantLock lock; 3027 * boolean hasLock = false; 3028 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 3029 * public boolean block() { 3030 * if (!hasLock) 3031 * lock.lock(); 3032 * return true; 3033 * } 3034 * public boolean isReleasable() { 3035 * return hasLock || (hasLock = lock.tryLock()); 3036 * } 3037 * }}</pre> 3038 * 3039 * <p>Here is a class that possibly blocks waiting for an 3040 * item on a given queue: 3041 * <pre> {@code 3042 * class QueueTaker<E> implements ManagedBlocker { 3043 * final BlockingQueue<E> queue; 3044 * volatile E item = null; 3045 * QueueTaker(BlockingQueue<E> q) { this.queue = q; } 3046 * public boolean block() throws InterruptedException { 3047 * if (item == null) 3048 * item = queue.take(); 3049 * return true; 3050 * } 3051 * public boolean isReleasable() { 3052 * return item != null || (item = queue.poll()) != null; 3053 * } 3054 * public E getItem() { // call after pool.managedBlock completes 3055 * return item; 3056 * } 3057 * }}</pre> 3058 */ 3059 public static interface ManagedBlocker { 3060 /** 3061 * Possibly blocks the current thread, for example waiting for 3062 * a lock or condition. 3063 * 3064 * @return {@code true} if no additional blocking is necessary 3065 * (i.e., if isReleasable would return true) 3066 * @throws InterruptedException if interrupted while waiting 3067 * (the method is not required to do so, but is allowed to) 3068 */ 3069 boolean block() throws InterruptedException; 3070 3071 /** 3072 * Returns {@code true} if blocking is unnecessary. 3073 * @return {@code true} if blocking is unnecessary 3074 */ 3075 boolean isReleasable(); 3076 } 3077 3078 /** 3079 * Runs the given possibly blocking task. When {@linkplain 3080 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this 3081 * method possibly arranges for a spare thread to be activated if 3082 * necessary to ensure sufficient parallelism while the current 3083 * thread is blocked in {@link ManagedBlocker#block blocker.block()}. 3084 * 3085 * <p>This method repeatedly calls {@code blocker.isReleasable()} and 3086 * {@code blocker.block()} until either method returns {@code true}. 3087 * Every call to {@code blocker.block()} is preceded by a call to 3088 * {@code blocker.isReleasable()} that returned {@code false}. 3089 * 3090 * <p>If not running in a ForkJoinPool, this method is 3091 * behaviorally equivalent to 3092 * <pre> {@code 3093 * while (!blocker.isReleasable()) 3094 * if (blocker.block()) 3095 * break;}</pre> 3096 * 3097 * If running in a ForkJoinPool, the pool may first be expanded to 3098 * ensure sufficient parallelism available during the call to 3099 * {@code blocker.block()}. 3100 * 3101 * @param blocker the blocker task 3102 * @throws InterruptedException if {@code blocker.block()} did so 3103 */ 3104 public static void managedBlock(ManagedBlocker blocker) 3105 throws InterruptedException { 3106 if (blocker == null) throw new NullPointerException(); 3107 ForkJoinPool p; 3108 ForkJoinWorkerThread wt; 3109 WorkQueue w; 3110 Thread t = Thread.currentThread(); 3111 if ((t instanceof ForkJoinWorkerThread) && 3112 (p = (wt = (ForkJoinWorkerThread)t).pool) != null && 3113 (w = wt.workQueue) != null) { 3114 int block; 3115 while (!blocker.isReleasable()) { 3116 if ((block = p.tryCompensate(w)) != 0) { 3117 try { 3118 do {} while (!blocker.isReleasable() && 3119 !blocker.block()); 3120 } finally { 3121 CTL.getAndAdd(p, (block > 0) ? RC_UNIT : 0L); 3122 } 3123 break; 3124 } 3125 } 3126 } 3127 else { 3128 do {} while (!blocker.isReleasable() && 3129 !blocker.block()); 3130 } 3131 } 3132 3133 /** 3134 * If the given executor is a ForkJoinPool, poll and execute 3135 * AsynchronousCompletionTasks from worker's queue until none are 3136 * available or blocker is released. 3137 */ 3138 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { 3139 if (e instanceof ForkJoinPool) { 3140 WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n; 3141 ForkJoinPool p = (ForkJoinPool)e; 3142 Thread thread = Thread.currentThread(); 3143 if (thread instanceof ForkJoinWorkerThread && 3144 (wt = (ForkJoinWorkerThread)thread).pool == p) 3145 w = wt.workQueue; 3146 else if ((r = ThreadLocalRandom.getProbe()) != 0 && 3147 (ws = p.workQueues) != null && (n = ws.length) > 0) 3148 w = ws[(n - 1) & r & SQMASK]; 3149 else 3150 w = null; 3151 if (w != null) 3152 w.helpAsyncBlocker(blocker); 3153 } 3154 } 3155 3156 // AbstractExecutorService overrides. These rely on undocumented 3157 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also 3158 // implement RunnableFuture. 3159 3160 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 3161 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value); 3162 } 3163 3164 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 3165 return new ForkJoinTask.AdaptedCallable<T>(callable); 3166 } 3167 3168 // VarHandle mechanics 3169 private static final VarHandle CTL; 3170 private static final VarHandle MODE; 3171 static final VarHandle QA; 3172 3173 static { 3174 try { 3175 MethodHandles.Lookup l = MethodHandles.lookup(); 3176 CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class); 3177 MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class); 3178 QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class); 3179 } catch (ReflectiveOperationException e) { 3180 throw new ExceptionInInitializerError(e); 3181 } 3182 3183 // Reduce the risk of rare disastrous classloading in first call to 3184 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 3185 Class<?> ensureLoaded = LockSupport.class; 3186 3187 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES; 3188 try { 3189 String p = System.getProperty 3190 ("java.util.concurrent.ForkJoinPool.common.maximumSpares"); 3191 if (p != null) 3192 commonMaxSpares = Integer.parseInt(p); 3193 } catch (Exception ignore) {} 3194 COMMON_MAX_SPARES = commonMaxSpares; 3195 3196 defaultForkJoinWorkerThreadFactory = 3197 new DefaultForkJoinWorkerThreadFactory(); 3198 modifyThreadPermission = new RuntimePermission("modifyThread"); 3199 3200 common = AccessController.doPrivileged(new PrivilegedAction<>() { 3201 public ForkJoinPool run() { 3202 return new ForkJoinPool((byte)0); }}); 3203 3204 COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1); 3205 } 3206 3207 /** 3208 * Factory for innocuous worker threads. 3209 */ 3210 private static final class InnocuousForkJoinWorkerThreadFactory 3211 implements ForkJoinWorkerThreadFactory { 3212 3213 /** 3214 * An ACC to restrict permissions for the factory itself. 3215 * The constructed workers have no permissions set. 3216 */ 3217 private static final AccessControlContext ACC = contextWithPermissions( 3218 modifyThreadPermission, 3219 new RuntimePermission("enableContextClassLoaderOverride"), 3220 new RuntimePermission("modifyThreadGroup"), 3221 new RuntimePermission("getClassLoader"), 3222 new RuntimePermission("setContextClassLoader")); 3223 3224 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 3225 return AccessController.doPrivileged( 3226 new PrivilegedAction<>() { 3227 public ForkJoinWorkerThread run() { 3228 return new ForkJoinWorkerThread. 3229 InnocuousForkJoinWorkerThread(pool); }}, 3230 ACC); 3231 } 3232 } 3233 } 3234