• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.scheduling
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.internal.*
10 import java.util.concurrent.*
11 import kotlin.coroutines.*
12 
13 /**
14  * Default instance of coroutine dispatcher.
15  */
16 internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
17     val IO: CoroutineDispatcher = LimitingDispatcher(
18         this,
19         systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
20         "Dispatchers.IO",
21         TASK_PROBABLY_BLOCKING
22     )
23 
closenull24     override fun close() {
25         throw UnsupportedOperationException("$DEFAULT_DISPATCHER_NAME cannot be closed")
26     }
27 
toStringnull28     override fun toString(): String = DEFAULT_DISPATCHER_NAME
29 
30     @InternalCoroutinesApi
31     @Suppress("UNUSED")
32     public fun toDebugString(): String = super.toString()
33 }
34 
35 /**
36  * @suppress **This is unstable API and it is subject to change.**
37  */
38 // TODO make internal (and rename) after complete integration
39 @InternalCoroutinesApi
40 public open class ExperimentalCoroutineDispatcher(
41     private val corePoolSize: Int,
42     private val maxPoolSize: Int,
43     private val idleWorkerKeepAliveNs: Long,
44     private val schedulerName: String = "CoroutineScheduler"
45 ) : ExecutorCoroutineDispatcher() {
46     public constructor(
47         corePoolSize: Int = CORE_POOL_SIZE,
48         maxPoolSize: Int = MAX_POOL_SIZE,
49         schedulerName: String = DEFAULT_SCHEDULER_NAME
50     ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
51 
52     @Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
53     public constructor(
54         corePoolSize: Int = CORE_POOL_SIZE,
55         maxPoolSize: Int = MAX_POOL_SIZE
56     ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)
57 
58     override val executor: Executor
59         get() = coroutineScheduler
60 
61     // This is variable for test purposes, so that we can reinitialize from clean state
62     private var coroutineScheduler = createScheduler()
63 
64     override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
65         try {
66             coroutineScheduler.dispatch(block)
67         } catch (e: RejectedExecutionException) {
68             // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
69             // for testing purposes, so we don't have to worry about cancelling the affected Job here.
70             DefaultExecutor.dispatch(context, block)
71         }
72 
73     override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
74         try {
75             coroutineScheduler.dispatch(block, tailDispatch = true)
76         } catch (e: RejectedExecutionException) {
77             // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
78             // for testing purposes, so we don't have to worry about cancelling the affected Job here.
79             DefaultExecutor.dispatchYield(context, block)
80         }
81 
82     override fun close(): Unit = coroutineScheduler.close()
83 
84     override fun toString(): String {
85         return "${super.toString()}[scheduler = $coroutineScheduler]"
86     }
87 
88     /**
89      * Creates a coroutine execution context with limited parallelism to execute tasks which may potentially block.
90      * Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher],
91      * giving it additional hints to adjust its behaviour.
92      *
93      * @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel.
94      */
95     public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
96         require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
97         return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING)
98     }
99 
100     /**
101      * Creates a coroutine execution context with limited parallelism to execute CPU-intensive tasks.
102      * Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher],
103      * giving it additional hints to adjust its behaviour.
104      *
105      * @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel.
106      */
107     public fun limited(parallelism: Int): CoroutineDispatcher {
108         require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
109         require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" }
110         return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING)
111     }
112 
113     internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
114         try {
115             coroutineScheduler.dispatch(block, context, tailDispatch)
116         } catch (e: RejectedExecutionException) {
117             // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
118             // for testing purposes, so we don't have to worry about cancelling the affected Job here.
119             // TaskContext shouldn't be lost here to properly invoke before/after task
120             DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
121         }
122     }
123 
124     private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
125 
126     // fot tests only
127     @Synchronized
128     internal fun usePrivateScheduler() {
129         coroutineScheduler.shutdown(1_000L)
130         coroutineScheduler = createScheduler()
131     }
132 
133     // for tests only
134     @Synchronized
135     internal fun shutdown(timeout: Long) {
136         coroutineScheduler.shutdown(timeout)
137     }
138 
139     // for tests only
140     internal fun restore() = usePrivateScheduler() // recreate scheduler
141 }
142 
143 private class LimitingDispatcher(
144     private val dispatcher: ExperimentalCoroutineDispatcher,
145     private val parallelism: Int,
146     private val name: String?,
147     override val taskMode: Int
148 ) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
149 
150     private val queue = ConcurrentLinkedQueue<Runnable>()
151     private val inFlightTasks = atomic(0)
152 
153     override val executor: Executor
154         get() = this
155 
executenull156     override fun execute(command: Runnable) = dispatch(command, false)
157 
158     override fun close(): Unit = error("Close cannot be invoked on LimitingBlockingDispatcher")
159 
160     override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
161 
162     private fun dispatch(block: Runnable, tailDispatch: Boolean) {
163         var taskToSchedule = block
164         while (true) {
165             // Commit in-flight tasks slot
166             val inFlight = inFlightTasks.incrementAndGet()
167 
168             // Fast path, if parallelism limit is not reached, dispatch task and return
169             if (inFlight <= parallelism) {
170                 dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
171                 return
172             }
173 
174             // Parallelism limit is reached, add task to the queue
175             queue.add(taskToSchedule)
176 
177             /*
178              * We're not actually scheduled anything, so rollback committed in-flight task slot:
179              * If the amount of in-flight tasks is still above the limit, do nothing
180              * If the amount of in-flight tasks is lesser than parallelism, then
181              * it's a race with a thread which finished the task from the current context, we should resubmit the first task from the queue
182              * to avoid starvation.
183              *
184              * Race example #1 (TN is N-th thread, R is current in-flight tasks number), execution is sequential:
185              *
186              * T1: submit task, start execution, R == 1
187              * T2: commit slot for next task, R == 2
188              * T1: finish T1, R == 1
189              * T2: submit next task to local queue, decrement R, R == 0
190              * Without retries, task from T2 will be stuck in the local queue
191              */
192             if (inFlightTasks.decrementAndGet() >= parallelism) {
193                 return
194             }
195 
196             taskToSchedule = queue.poll() ?: return
197         }
198     }
199 
dispatchYieldnull200     override fun dispatchYield(context: CoroutineContext, block: Runnable) {
201         dispatch(block, tailDispatch = true)
202     }
203 
toStringnull204     override fun toString(): String {
205         return name ?: "${super.toString()}[dispatcher = $dispatcher]"
206     }
207 
208     /**
209      * Tries to dispatch tasks which were blocked due to reaching parallelism limit if there is any.
210      *
211      * Implementation note: blocking tasks are scheduled in a fair manner (to local queue tail) to avoid
212      * non-blocking continuations starvation.
213      * E.g. for
214      * ```
215      * foo()
216      * blocking()
217      * bar()
218      * ```
219      * it's more profitable to execute bar at the end of `blocking` rather than pending blocking task
220      */
afterTasknull221     override fun afterTask() {
222         var next = queue.poll()
223         // If we have pending tasks in current blocking context, dispatch first
224         if (next != null) {
225             dispatcher.dispatchWithContext(next, this, true)
226             return
227         }
228         inFlightTasks.decrementAndGet()
229 
230         /*
231          * Re-poll again and try to submit task if it's required otherwise tasks may be stuck in the local queue.
232          * Race example #2 (TN is N-th thread, R is current in-flight tasks number), execution is sequential:
233          * T1: submit task, start execution, R == 1
234          * T2: commit slot for next task, R == 2
235          * T1: finish T1, poll queue (it's still empty), R == 2
236          * T2: submit next task to the local queue, decrement R, R == 1
237          * T1: decrement R, finish. R == 0
238          *
239          * The task from T2 is stuck is the local queue
240          */
241         next = queue.poll() ?: return
242         dispatch(next, true)
243     }
244 }
245