• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.scheduling
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.internal.*
9 import java.util.concurrent.*
10 import kotlin.coroutines.*
11 
12 // Instance of Dispatchers.Default
13 internal object DefaultScheduler : SchedulerCoroutineDispatcher(
14     CORE_POOL_SIZE, MAX_POOL_SIZE,
15     IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
16 ) {
17 
18     @ExperimentalCoroutinesApi
limitedParallelismnull19     override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
20         parallelism.checkParallelism()
21         if (parallelism >= CORE_POOL_SIZE) return this
22         return super.limitedParallelism(parallelism)
23     }
24 
25     // Shuts down the dispatcher, used only by Dispatchers.shutdown()
shutdownnull26     internal fun shutdown() {
27         super.close()
28     }
29 
30     // Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close()
closenull31     override fun close() {
32         throw UnsupportedOperationException("Dispatchers.Default cannot be closed")
33     }
34 
toStringnull35     override fun toString(): String = "Dispatchers.Default"
36 }
37 
38 // The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides
39 private object UnlimitedIoScheduler : CoroutineDispatcher() {
40 
41     @InternalCoroutinesApi
42     override fun dispatchYield(context: CoroutineContext, block: Runnable) {
43         DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
44     }
45 
46     override fun dispatch(context: CoroutineContext, block: Runnable) {
47         DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
48     }
49 
50     @ExperimentalCoroutinesApi
51     override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
52         parallelism.checkParallelism()
53         if (parallelism >= MAX_POOL_SIZE) return this
54         return super.limitedParallelism(parallelism)
55     }
56 }
57 
58 // Dispatchers.IO
59 internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
60 
61     private val default = UnlimitedIoScheduler.limitedParallelism(
62         systemProp(
63             IO_PARALLELISM_PROPERTY_NAME,
64             64.coerceAtLeast(AVAILABLE_PROCESSORS)
65         )
66     )
67 
68     override val executor: Executor
69         get() = this
70 
executenull71     override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command)
72 
73     @ExperimentalCoroutinesApi
74     override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
75         // See documentation to Dispatchers.IO for the rationale
76         return UnlimitedIoScheduler.limitedParallelism(parallelism)
77     }
78 
dispatchnull79     override fun dispatch(context: CoroutineContext, block: Runnable) {
80         default.dispatch(context, block)
81     }
82 
83     @InternalCoroutinesApi
dispatchYieldnull84     override fun dispatchYield(context: CoroutineContext, block: Runnable) {
85         default.dispatchYield(context, block)
86     }
87 
closenull88     override fun close() {
89         error("Cannot be invoked on Dispatchers.IO")
90     }
91 
toStringnull92     override fun toString(): String = "Dispatchers.IO"
93 }
94 
95 // Instantiated in tests so we can test it in isolation
96 internal open class SchedulerCoroutineDispatcher(
97     private val corePoolSize: Int = CORE_POOL_SIZE,
98     private val maxPoolSize: Int = MAX_POOL_SIZE,
99     private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
100     private val schedulerName: String = "CoroutineScheduler",
101 ) : ExecutorCoroutineDispatcher() {
102 
103     override val executor: Executor
104         get() = coroutineScheduler
105 
106     // This is variable for test purposes, so that we can reinitialize from clean state
107     private var coroutineScheduler = createScheduler()
108 
109     private fun createScheduler() =
110         CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
111 
112     override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
113 
114     override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
115         coroutineScheduler.dispatch(block, tailDispatch = true)
116 
117     internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
118         coroutineScheduler.dispatch(block, context, tailDispatch)
119     }
120 
121     override fun close() {
122         coroutineScheduler.close()
123     }
124 
125     // fot tests only
126     @Synchronized
127     internal fun usePrivateScheduler() {
128         coroutineScheduler.shutdown(1_000L)
129         coroutineScheduler = createScheduler()
130     }
131 
132     // for tests only
133     @Synchronized
134     internal fun shutdown(timeout: Long) {
135         coroutineScheduler.shutdown(timeout)
136     }
137 
138     // for tests only
139     internal fun restore() = usePrivateScheduler() // recreate scheduler
140 }
141