1 /* 2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.scheduling 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.internal.* 9 import java.util.concurrent.* 10 import kotlin.coroutines.* 11 12 // Instance of Dispatchers.Default 13 internal object DefaultScheduler : SchedulerCoroutineDispatcher( 14 CORE_POOL_SIZE, MAX_POOL_SIZE, 15 IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME 16 ) { 17 // Shuts down the dispatcher, used only by Dispatchers.shutdown() shutdownnull18 internal fun shutdown() { 19 super.close() 20 } 21 22 // Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close() closenull23 override fun close() { 24 throw UnsupportedOperationException("Dispatchers.Default cannot be closed") 25 } 26 toStringnull27 override fun toString(): String = "Dispatchers.Default" 28 } 29 30 // The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides 31 private object UnlimitedIoScheduler : CoroutineDispatcher() { 32 33 @InternalCoroutinesApi 34 override fun dispatchYield(context: CoroutineContext, block: Runnable) { 35 DefaultScheduler.dispatchWithContext(block, BlockingContext, true) 36 } 37 38 override fun dispatch(context: CoroutineContext, block: Runnable) { 39 DefaultScheduler.dispatchWithContext(block, BlockingContext, false) 40 } 41 } 42 43 // Dispatchers.IO 44 internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor { 45 46 private val default = UnlimitedIoScheduler.limitedParallelism( 47 systemProp( 48 IO_PARALLELISM_PROPERTY_NAME, 49 64.coerceAtLeast(AVAILABLE_PROCESSORS) 50 ) 51 ) 52 53 override val executor: Executor 54 get() = this 55 executenull56 override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command) 57 58 @ExperimentalCoroutinesApi 59 override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { 60 // See documentation to Dispatchers.IO for the rationale 61 return UnlimitedIoScheduler.limitedParallelism(parallelism) 62 } 63 dispatchnull64 override fun dispatch(context: CoroutineContext, block: Runnable) { 65 default.dispatch(context, block) 66 } 67 68 @InternalCoroutinesApi dispatchYieldnull69 override fun dispatchYield(context: CoroutineContext, block: Runnable) { 70 default.dispatchYield(context, block) 71 } 72 closenull73 override fun close() { 74 error("Cannot be invoked on Dispatchers.IO") 75 } 76 toStringnull77 override fun toString(): String = "Dispatchers.IO" 78 } 79 80 // Instantiated in tests so we can test it in isolation 81 internal open class SchedulerCoroutineDispatcher( 82 private val corePoolSize: Int = CORE_POOL_SIZE, 83 private val maxPoolSize: Int = MAX_POOL_SIZE, 84 private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, 85 private val schedulerName: String = "CoroutineScheduler", 86 ) : ExecutorCoroutineDispatcher() { 87 88 override val executor: Executor 89 get() = coroutineScheduler 90 91 // This is variable for test purposes, so that we can reinitialize from clean state 92 private var coroutineScheduler = createScheduler() 93 94 private fun createScheduler() = 95 CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) 96 97 override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block) 98 99 override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = 100 coroutineScheduler.dispatch(block, tailDispatch = true) 101 102 internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) { 103 coroutineScheduler.dispatch(block, context, tailDispatch) 104 } 105 106 override fun close() { 107 coroutineScheduler.close() 108 } 109 110 // fot tests only 111 @Synchronized 112 internal fun usePrivateScheduler() { 113 coroutineScheduler.shutdown(1_000L) 114 coroutineScheduler = createScheduler() 115 } 116 117 // for tests only 118 @Synchronized 119 internal fun shutdown(timeout: Long) { 120 coroutineScheduler.shutdown(timeout) 121 } 122 123 // for tests only 124 internal fun restore() = usePrivateScheduler() // recreate scheduler 125 } 126