• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.scheduling
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.internal.*
9 import java.util.concurrent.*
10 import kotlin.coroutines.*
11 
12 // Instance of Dispatchers.Default
13 internal object DefaultScheduler : SchedulerCoroutineDispatcher(
14     CORE_POOL_SIZE, MAX_POOL_SIZE,
15     IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
16 ) {
17     // Shuts down the dispatcher, used only by Dispatchers.shutdown()
shutdownnull18     internal fun shutdown() {
19         super.close()
20     }
21 
22     // Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close()
closenull23     override fun close() {
24         throw UnsupportedOperationException("Dispatchers.Default cannot be closed")
25     }
26 
toStringnull27     override fun toString(): String = "Dispatchers.Default"
28 }
29 
30 // The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides
31 private object UnlimitedIoScheduler : CoroutineDispatcher() {
32 
33     @InternalCoroutinesApi
34     override fun dispatchYield(context: CoroutineContext, block: Runnable) {
35         DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
36     }
37 
38     override fun dispatch(context: CoroutineContext, block: Runnable) {
39         DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
40     }
41 }
42 
43 // Dispatchers.IO
44 internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
45 
46     private val default = UnlimitedIoScheduler.limitedParallelism(
47         systemProp(
48             IO_PARALLELISM_PROPERTY_NAME,
49             64.coerceAtLeast(AVAILABLE_PROCESSORS)
50         )
51     )
52 
53     override val executor: Executor
54         get() = this
55 
executenull56     override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command)
57 
58     @ExperimentalCoroutinesApi
59     override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
60         // See documentation to Dispatchers.IO for the rationale
61         return UnlimitedIoScheduler.limitedParallelism(parallelism)
62     }
63 
dispatchnull64     override fun dispatch(context: CoroutineContext, block: Runnable) {
65         default.dispatch(context, block)
66     }
67 
68     @InternalCoroutinesApi
dispatchYieldnull69     override fun dispatchYield(context: CoroutineContext, block: Runnable) {
70         default.dispatchYield(context, block)
71     }
72 
closenull73     override fun close() {
74         error("Cannot be invoked on Dispatchers.IO")
75     }
76 
toStringnull77     override fun toString(): String = "Dispatchers.IO"
78 }
79 
80 // Instantiated in tests so we can test it in isolation
81 internal open class SchedulerCoroutineDispatcher(
82     private val corePoolSize: Int = CORE_POOL_SIZE,
83     private val maxPoolSize: Int = MAX_POOL_SIZE,
84     private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
85     private val schedulerName: String = "CoroutineScheduler",
86 ) : ExecutorCoroutineDispatcher() {
87 
88     override val executor: Executor
89         get() = coroutineScheduler
90 
91     // This is variable for test purposes, so that we can reinitialize from clean state
92     private var coroutineScheduler = createScheduler()
93 
94     private fun createScheduler() =
95         CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
96 
97     override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
98 
99     override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
100         coroutineScheduler.dispatch(block, tailDispatch = true)
101 
102     internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
103         coroutineScheduler.dispatch(block, context, tailDispatch)
104     }
105 
106     override fun close() {
107         coroutineScheduler.close()
108     }
109 
110     // fot tests only
111     @Synchronized
112     internal fun usePrivateScheduler() {
113         coroutineScheduler.shutdown(1_000L)
114         coroutineScheduler = createScheduler()
115     }
116 
117     // for tests only
118     @Synchronized
119     internal fun shutdown(timeout: Long) {
120         coroutineScheduler.shutdown(timeout)
121     }
122 
123     // for tests only
124     internal fun restore() = usePrivateScheduler() // recreate scheduler
125 }
126