1 /* 2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import java.util.concurrent.* 8 import java.util.concurrent.atomic.* 9 import kotlin.coroutines.* 10 11 /** 12 * Represents common pool of shared threads as coroutine dispatcher for compute-intensive tasks. 13 * 14 * If there isn't a SecurityManager present it uses [java.util.concurrent.ForkJoinPool] when available, which implements 15 * efficient work-stealing algorithm for its queues, so every coroutine resumption is dispatched as a separate task even 16 * when it already executes inside the pool. When available, it wraps `ForkJoinPool.commonPool` and provides a similar 17 * shared pool where not. 18 * 19 * If there is a SecurityManager present (as would be if running inside a Java Web Start context) then a plain thread 20 * pool is created. This is to work around the fact that ForkJoinPool creates threads that cannot perform 21 * privileged actions. 22 */ 23 internal object CommonPool : ExecutorCoroutineDispatcher() { 24 25 /** 26 * Name of the property that controls default parallelism level of [CommonPool]. 27 * If the property is not specified, `Runtime.getRuntime().availableProcessors() - 1` will be used instead (or `1` for single-core JVM). 28 * Note that until Java 10, if an application is run within a container, 29 * `Runtime.getRuntime().availableProcessors()` is not aware of container constraints and will return the real number of cores. 30 */ 31 public const val DEFAULT_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.default.parallelism" 32 33 override val executor: Executor 34 get() = pool ?: getOrCreatePoolSync() 35 36 // Equals to -1 if not explicitly specified <lambda>null37 private val requestedParallelism = run<Int> { 38 val property = Try { System.getProperty(DEFAULT_PARALLELISM_PROPERTY_NAME) } ?: return@run -1 39 val parallelism = property.toIntOrNull() 40 if (parallelism == null || parallelism < 1) { 41 error("Expected positive number in $DEFAULT_PARALLELISM_PROPERTY_NAME, but has $property") 42 } 43 parallelism 44 } 45 46 private val parallelism: Int <lambda>null47 get() = requestedParallelism.takeIf { it > 0 } 48 ?: (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1) 49 50 // For debug and tests 51 private var usePrivatePool = false 52 53 @Volatile 54 private var pool: Executor? = null 55 Trynull56 private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null } 57 createPoolnull58 private fun createPool(): ExecutorService { 59 if (System.getSecurityManager() != null) return createPlainPool() 60 // Reflection on ForkJoinPool class so that it works on JDK 6 (which is absent there) 61 val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") } 62 ?: return createPlainPool() // Fallback to plain thread pool 63 // Try to use commonPool unless parallelism was explicitly specified or in debug privatePool mode 64 if (!usePrivatePool && requestedParallelism < 0) { 65 Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService } 66 ?.takeIf { isGoodCommonPool(fjpClass, it) } 67 ?.let { return it } 68 } 69 // Try to create private ForkJoinPool instance 70 Try { fjpClass.getConstructor(Int::class.java).newInstance(parallelism) as? ExecutorService } 71 ?. let { return it } 72 // Fallback to plain thread pool 73 return createPlainPool() 74 } 75 76 /** 77 * Checks that this ForkJoinPool's parallelism is at least one to avoid pathological bugs. 78 */ isGoodCommonPoolnull79 internal fun isGoodCommonPool(fjpClass: Class<*>, executor: ExecutorService): Boolean { 80 // We cannot use getParallelism, since it lies to us (always returns at least 1) 81 // So we submit a task and check that getPoolSize is at least one after that 82 // A broken FJP (that is configured for 0 parallelism) would not execute the task and 83 // would report its pool size as zero. 84 executor.submit {} 85 val actual = Try { fjpClass.getMethod("getPoolSize").invoke(executor) as? Int } 86 ?: return false 87 return actual >= 1 88 } 89 createPlainPoolnull90 private fun createPlainPool(): ExecutorService { 91 val threadId = AtomicInteger() 92 return Executors.newFixedThreadPool(parallelism) { 93 Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true } 94 } 95 } 96 97 @Synchronized getOrCreatePoolSyncnull98 private fun getOrCreatePoolSync(): Executor = 99 pool ?: createPool().also { pool = it } 100 dispatchnull101 override fun dispatch(context: CoroutineContext, block: Runnable) { 102 try { 103 (pool ?: getOrCreatePoolSync()).execute(wrapTask(block)) 104 } catch (e: RejectedExecutionException) { 105 unTrackTask() 106 // CommonPool only rejects execution when it is being closed and this behavior is reserved 107 // for testing purposes, so we don't have to worry about cancelling the affected Job here. 108 DefaultExecutor.enqueue(block) 109 } 110 } 111 112 // used for tests 113 @Synchronized usePrivatePoolnull114 internal fun usePrivatePool() { 115 shutdown(0) 116 usePrivatePool = true 117 pool = null 118 } 119 120 // used for tests 121 @Synchronized shutdownnull122 internal fun shutdown(timeout: Long) { 123 (pool as? ExecutorService)?.apply { 124 shutdown() 125 if (timeout > 0) 126 awaitTermination(timeout, TimeUnit.MILLISECONDS) 127 shutdownNow().forEach { DefaultExecutor.enqueue(it) } 128 } 129 pool = Executor { throw RejectedExecutionException("CommonPool was shutdown") } 130 } 131 132 // used for tests 133 @Synchronized restorenull134 internal fun restore() { 135 shutdown(0) 136 usePrivatePool = false 137 pool = null 138 } 139 toStringnull140 override fun toString(): String = "CommonPool" 141 142 override fun close(): Unit = error("Close cannot be invoked on CommonPool") 143 } 144