1 /* 2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.scheduling 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.internal.* 9 import java.util.concurrent.* 10 import kotlin.coroutines.* 11 12 // Instance of Dispatchers.Default 13 internal object DefaultScheduler : SchedulerCoroutineDispatcher( 14 CORE_POOL_SIZE, MAX_POOL_SIZE, 15 IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME 16 ) { 17 18 @ExperimentalCoroutinesApi limitedParallelismnull19 override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { 20 parallelism.checkParallelism() 21 if (parallelism >= CORE_POOL_SIZE) return this 22 return super.limitedParallelism(parallelism) 23 } 24 25 // Shuts down the dispatcher, used only by Dispatchers.shutdown() shutdownnull26 internal fun shutdown() { 27 super.close() 28 } 29 30 // Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close() closenull31 override fun close() { 32 throw UnsupportedOperationException("Dispatchers.Default cannot be closed") 33 } 34 toStringnull35 override fun toString(): String = "Dispatchers.Default" 36 } 37 38 // The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides 39 private object UnlimitedIoScheduler : CoroutineDispatcher() { 40 41 @InternalCoroutinesApi 42 override fun dispatchYield(context: CoroutineContext, block: Runnable) { 43 DefaultScheduler.dispatchWithContext(block, BlockingContext, true) 44 } 45 46 override fun dispatch(context: CoroutineContext, block: Runnable) { 47 DefaultScheduler.dispatchWithContext(block, BlockingContext, false) 48 } 49 50 @ExperimentalCoroutinesApi 51 override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { 52 parallelism.checkParallelism() 53 if (parallelism >= MAX_POOL_SIZE) return this 54 return super.limitedParallelism(parallelism) 55 } 56 } 57 58 // Dispatchers.IO 59 internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor { 60 61 private val default = UnlimitedIoScheduler.limitedParallelism( 62 systemProp( 63 IO_PARALLELISM_PROPERTY_NAME, 64 64.coerceAtLeast(AVAILABLE_PROCESSORS) 65 ) 66 ) 67 68 override val executor: Executor 69 get() = this 70 executenull71 override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command) 72 73 @ExperimentalCoroutinesApi 74 override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { 75 // See documentation to Dispatchers.IO for the rationale 76 return UnlimitedIoScheduler.limitedParallelism(parallelism) 77 } 78 dispatchnull79 override fun dispatch(context: CoroutineContext, block: Runnable) { 80 default.dispatch(context, block) 81 } 82 83 @InternalCoroutinesApi dispatchYieldnull84 override fun dispatchYield(context: CoroutineContext, block: Runnable) { 85 default.dispatchYield(context, block) 86 } 87 closenull88 override fun close() { 89 error("Cannot be invoked on Dispatchers.IO") 90 } 91 toStringnull92 override fun toString(): String = "Dispatchers.IO" 93 } 94 95 // Instantiated in tests so we can test it in isolation 96 internal open class SchedulerCoroutineDispatcher( 97 private val corePoolSize: Int = CORE_POOL_SIZE, 98 private val maxPoolSize: Int = MAX_POOL_SIZE, 99 private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, 100 private val schedulerName: String = "CoroutineScheduler", 101 ) : ExecutorCoroutineDispatcher() { 102 103 override val executor: Executor 104 get() = coroutineScheduler 105 106 // This is variable for test purposes, so that we can reinitialize from clean state 107 private var coroutineScheduler = createScheduler() 108 109 private fun createScheduler() = 110 CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) 111 112 override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block) 113 114 override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = 115 coroutineScheduler.dispatch(block, tailDispatch = true) 116 117 internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) { 118 coroutineScheduler.dispatch(block, context, tailDispatch) 119 } 120 121 override fun close() { 122 coroutineScheduler.close() 123 } 124 125 // fot tests only 126 @Synchronized 127 internal fun usePrivateScheduler() { 128 coroutineScheduler.shutdown(1_000L) 129 coroutineScheduler = createScheduler() 130 } 131 132 // for tests only 133 @Synchronized 134 internal fun shutdown(timeout: Long) { 135 coroutineScheduler.shutdown(timeout) 136 } 137 138 // for tests only 139 internal fun restore() = usePrivateScheduler() // recreate scheduler 140 } 141