1 /* 2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.scheduling 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.* 9 import kotlinx.coroutines.internal.* 10 import java.util.concurrent.* 11 import kotlin.coroutines.* 12 13 /** 14 * Default instance of coroutine dispatcher. 15 */ 16 internal object DefaultScheduler : ExperimentalCoroutineDispatcher() { 17 val IO: CoroutineDispatcher = LimitingDispatcher( 18 this, 19 systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)), 20 "Dispatchers.IO", 21 TASK_PROBABLY_BLOCKING 22 ) 23 closenull24 override fun close() { 25 throw UnsupportedOperationException("$DEFAULT_DISPATCHER_NAME cannot be closed") 26 } 27 toStringnull28 override fun toString(): String = DEFAULT_DISPATCHER_NAME 29 30 @InternalCoroutinesApi 31 @Suppress("UNUSED") 32 public fun toDebugString(): String = super.toString() 33 } 34 35 /** 36 * @suppress **This is unstable API and it is subject to change.** 37 */ 38 // TODO make internal (and rename) after complete integration 39 @InternalCoroutinesApi 40 public open class ExperimentalCoroutineDispatcher( 41 private val corePoolSize: Int, 42 private val maxPoolSize: Int, 43 private val idleWorkerKeepAliveNs: Long, 44 private val schedulerName: String = "CoroutineScheduler" 45 ) : ExecutorCoroutineDispatcher() { 46 public constructor( 47 corePoolSize: Int = CORE_POOL_SIZE, 48 maxPoolSize: Int = MAX_POOL_SIZE, 49 schedulerName: String = DEFAULT_SCHEDULER_NAME 50 ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName) 51 52 @Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN) 53 public constructor( 54 corePoolSize: Int = CORE_POOL_SIZE, 55 maxPoolSize: Int = MAX_POOL_SIZE 56 ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS) 57 58 override val executor: Executor 59 get() = coroutineScheduler 60 61 // This is variable for test purposes, so that we can reinitialize from clean state 62 private var coroutineScheduler = createScheduler() 63 64 override fun dispatch(context: CoroutineContext, block: Runnable): Unit = 65 try { 66 coroutineScheduler.dispatch(block) 67 } catch (e: RejectedExecutionException) { 68 // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved 69 // for testing purposes, so we don't have to worry about cancelling the affected Job here. 70 DefaultExecutor.dispatch(context, block) 71 } 72 73 override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = 74 try { 75 coroutineScheduler.dispatch(block, tailDispatch = true) 76 } catch (e: RejectedExecutionException) { 77 // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved 78 // for testing purposes, so we don't have to worry about cancelling the affected Job here. 79 DefaultExecutor.dispatchYield(context, block) 80 } 81 82 override fun close(): Unit = coroutineScheduler.close() 83 84 override fun toString(): String { 85 return "${super.toString()}[scheduler = $coroutineScheduler]" 86 } 87 88 /** 89 * Creates a coroutine execution context with limited parallelism to execute tasks which may potentially block. 90 * Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher], 91 * giving it additional hints to adjust its behaviour. 92 * 93 * @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel. 94 */ 95 public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher { 96 require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" } 97 return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING) 98 } 99 100 /** 101 * Creates a coroutine execution context with limited parallelism to execute CPU-intensive tasks. 102 * Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher], 103 * giving it additional hints to adjust its behaviour. 104 * 105 * @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel. 106 */ 107 public fun limited(parallelism: Int): CoroutineDispatcher { 108 require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" } 109 require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" } 110 return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING) 111 } 112 113 internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) { 114 try { 115 coroutineScheduler.dispatch(block, context, tailDispatch) 116 } catch (e: RejectedExecutionException) { 117 // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved 118 // for testing purposes, so we don't have to worry about cancelling the affected Job here. 119 // TaskContext shouldn't be lost here to properly invoke before/after task 120 DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context)) 121 } 122 } 123 124 private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) 125 126 // fot tests only 127 @Synchronized 128 internal fun usePrivateScheduler() { 129 coroutineScheduler.shutdown(1_000L) 130 coroutineScheduler = createScheduler() 131 } 132 133 // for tests only 134 @Synchronized 135 internal fun shutdown(timeout: Long) { 136 coroutineScheduler.shutdown(timeout) 137 } 138 139 // for tests only 140 internal fun restore() = usePrivateScheduler() // recreate scheduler 141 } 142 143 private class LimitingDispatcher( 144 private val dispatcher: ExperimentalCoroutineDispatcher, 145 private val parallelism: Int, 146 private val name: String?, 147 override val taskMode: Int 148 ) : ExecutorCoroutineDispatcher(), TaskContext, Executor { 149 150 private val queue = ConcurrentLinkedQueue<Runnable>() 151 private val inFlightTasks = atomic(0) 152 153 override val executor: Executor 154 get() = this 155 executenull156 override fun execute(command: Runnable) = dispatch(command, false) 157 158 override fun close(): Unit = error("Close cannot be invoked on LimitingBlockingDispatcher") 159 160 override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false) 161 162 private fun dispatch(block: Runnable, tailDispatch: Boolean) { 163 var taskToSchedule = block 164 while (true) { 165 // Commit in-flight tasks slot 166 val inFlight = inFlightTasks.incrementAndGet() 167 168 // Fast path, if parallelism limit is not reached, dispatch task and return 169 if (inFlight <= parallelism) { 170 dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch) 171 return 172 } 173 174 // Parallelism limit is reached, add task to the queue 175 queue.add(taskToSchedule) 176 177 /* 178 * We're not actually scheduled anything, so rollback committed in-flight task slot: 179 * If the amount of in-flight tasks is still above the limit, do nothing 180 * If the amount of in-flight tasks is lesser than parallelism, then 181 * it's a race with a thread which finished the task from the current context, we should resubmit the first task from the queue 182 * to avoid starvation. 183 * 184 * Race example #1 (TN is N-th thread, R is current in-flight tasks number), execution is sequential: 185 * 186 * T1: submit task, start execution, R == 1 187 * T2: commit slot for next task, R == 2 188 * T1: finish T1, R == 1 189 * T2: submit next task to local queue, decrement R, R == 0 190 * Without retries, task from T2 will be stuck in the local queue 191 */ 192 if (inFlightTasks.decrementAndGet() >= parallelism) { 193 return 194 } 195 196 taskToSchedule = queue.poll() ?: return 197 } 198 } 199 dispatchYieldnull200 override fun dispatchYield(context: CoroutineContext, block: Runnable) { 201 dispatch(block, tailDispatch = true) 202 } 203 toStringnull204 override fun toString(): String { 205 return name ?: "${super.toString()}[dispatcher = $dispatcher]" 206 } 207 208 /** 209 * Tries to dispatch tasks which were blocked due to reaching parallelism limit if there is any. 210 * 211 * Implementation note: blocking tasks are scheduled in a fair manner (to local queue tail) to avoid 212 * non-blocking continuations starvation. 213 * E.g. for 214 * ``` 215 * foo() 216 * blocking() 217 * bar() 218 * ``` 219 * it's more profitable to execute bar at the end of `blocking` rather than pending blocking task 220 */ afterTasknull221 override fun afterTask() { 222 var next = queue.poll() 223 // If we have pending tasks in current blocking context, dispatch first 224 if (next != null) { 225 dispatcher.dispatchWithContext(next, this, true) 226 return 227 } 228 inFlightTasks.decrementAndGet() 229 230 /* 231 * Re-poll again and try to submit task if it's required otherwise tasks may be stuck in the local queue. 232 * Race example #2 (TN is N-th thread, R is current in-flight tasks number), execution is sequential: 233 * T1: submit task, start execution, R == 1 234 * T2: commit slot for next task, R == 2 235 * T1: finish T1, poll queue (it's still empty), R == 2 236 * T2: submit next task to the local queue, decrement R, R == 1 237 * T1: decrement R, finish. R == 0 238 * 239 * The task from T2 is stuck is the local queue 240 */ 241 next = queue.poll() ?: return 242 dispatch(next, true) 243 } 244 } 245