/* * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines import java.util.concurrent.* import java.util.concurrent.atomic.* import kotlin.coroutines.* /** * Represents common pool of shared threads as coroutine dispatcher for compute-intensive tasks. * * If there isn't a SecurityManager present it uses [java.util.concurrent.ForkJoinPool] when available, which implements * efficient work-stealing algorithm for its queues, so every coroutine resumption is dispatched as a separate task even * when it already executes inside the pool. When available, it wraps `ForkJoinPool.commonPool` and provides a similar * shared pool where not. * * If there is a SecurityManager present (as would be if running inside a Java Web Start context) then a plain thread * pool is created. This is to work around the fact that ForkJoinPool creates threads that cannot perform * privileged actions. */ internal object CommonPool : ExecutorCoroutineDispatcher() { /** * Name of the property that controls default parallelism level of [CommonPool]. * If the property is not specified, `Runtime.getRuntime().availableProcessors() - 1` will be used instead (or `1` for single-core JVM). * Note that until Java 10, if an application is run within a container, * `Runtime.getRuntime().availableProcessors()` is not aware of container constraints and will return the real number of cores. */ public const val DEFAULT_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.default.parallelism" override val executor: Executor get() = pool ?: getOrCreatePoolSync() // Equals to -1 if not explicitly specified private val requestedParallelism = run { val property = Try { System.getProperty(DEFAULT_PARALLELISM_PROPERTY_NAME) } ?: return@run -1 val parallelism = property.toIntOrNull() if (parallelism == null || parallelism < 1) { error("Expected positive number in $DEFAULT_PARALLELISM_PROPERTY_NAME, but has $property") } parallelism } private val parallelism: Int get() = requestedParallelism.takeIf { it > 0 } ?: (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1) // For debug and tests private var usePrivatePool = false @Volatile private var pool: Executor? = null private inline fun Try(block: () -> T) = try { block() } catch (e: Throwable) { null } private fun createPool(): ExecutorService { if (System.getSecurityManager() != null) return createPlainPool() // Reflection on ForkJoinPool class so that it works on JDK 6 (which is absent there) val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") } ?: return createPlainPool() // Fallback to plain thread pool // Try to use commonPool unless parallelism was explicitly specified or in debug privatePool mode if (!usePrivatePool && requestedParallelism < 0) { Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService } ?.takeIf { isGoodCommonPool(fjpClass, it) } ?.let { return it } } // Try to create private ForkJoinPool instance Try { fjpClass.getConstructor(Int::class.java).newInstance(parallelism) as? ExecutorService } ?. let { return it } // Fallback to plain thread pool return createPlainPool() } /** * Checks that this ForkJoinPool's parallelism is at least one to avoid pathological bugs. */ internal fun isGoodCommonPool(fjpClass: Class<*>, executor: ExecutorService): Boolean { // We cannot use getParallelism, since it lies to us (always returns at least 1) // So we submit a task and check that getPoolSize is at least one after that // A broken FJP (that is configured for 0 parallelism) would not execute the task and // would report its pool size as zero. executor.submit {} val actual = Try { fjpClass.getMethod("getPoolSize").invoke(executor) as? Int } ?: return false return actual >= 1 } private fun createPlainPool(): ExecutorService { val threadId = AtomicInteger() return Executors.newFixedThreadPool(parallelism) { Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true } } } @Synchronized private fun getOrCreatePoolSync(): Executor = pool ?: createPool().also { pool = it } override fun dispatch(context: CoroutineContext, block: Runnable) { try { (pool ?: getOrCreatePoolSync()).execute(wrapTask(block)) } catch (e: RejectedExecutionException) { unTrackTask() // CommonPool only rejects execution when it is being closed and this behavior is reserved // for testing purposes, so we don't have to worry about cancelling the affected Job here. DefaultExecutor.enqueue(block) } } // used for tests @Synchronized internal fun usePrivatePool() { shutdown(0) usePrivatePool = true pool = null } // used for tests @Synchronized internal fun shutdown(timeout: Long) { (pool as? ExecutorService)?.apply { shutdown() if (timeout > 0) awaitTermination(timeout, TimeUnit.MILLISECONDS) shutdownNow().forEach { DefaultExecutor.enqueue(it) } } pool = Executor { throw RejectedExecutionException("CommonPool was shutdown") } } // used for tests @Synchronized internal fun restore() { shutdown(0) usePrivatePool = false pool = null } override fun toString(): String = "CommonPool" override fun close(): Unit = error("Close cannot be invoked on CommonPool") }