1 package kotlinx.coroutines 2 3 import kotlinx.coroutines.internal.* 4 import kotlin.coroutines.* 5 6 /** 7 * Base class to be extended by all coroutine dispatcher implementations. 8 * 9 * If `kotlinx-coroutines` is used, it is recommended to avoid [ContinuationInterceptor] instances that are not 10 * [CoroutineDispatcher] implementations, as [CoroutineDispatcher] ensures that the 11 * debugging facilities in the [newCoroutineContext] function work properly. 12 * 13 * ## Predefined dispatchers 14 * 15 * The following standard implementations are provided by `kotlinx.coroutines` as properties on 16 * the [Dispatchers] object: 17 * 18 * - [Dispatchers.Default] is used by all standard builders if no dispatcher or any other [ContinuationInterceptor] 19 * is specified in their context. 20 * It uses a common pool of shared background threads. 21 * This is an appropriate choice for compute-intensive coroutines that consume CPU resources. 22 * - `Dispatchers.IO` (available on the JVM and Native targets) 23 * uses a shared pool of on-demand created threads and is designed for offloading of IO-intensive _blocking_ 24 * operations (like file I/O and blocking socket I/O). 25 * - [Dispatchers.Main] represents the UI thread if one is available. 26 * - [Dispatchers.Unconfined] starts coroutine execution in the current call-frame until the first suspension, 27 * at which point the coroutine builder function returns. 28 * When the coroutine is resumed, the thread from which it is resumed will run the coroutine code until the next 29 * suspension, and so on. 30 * **The `Unconfined` dispatcher should not normally be used in code**. 31 * - Calling [limitedParallelism] on any dispatcher creates a view of the dispatcher that limits the parallelism 32 * to the given value. 33 * This allows creating private thread pools without spawning new threads. 34 * For example, `Dispatchers.IO.limitedParallelism(4)` creates a dispatcher that allows running at most 35 * 4 tasks in parallel, reusing the existing IO dispatcher threads. 36 * - When thread pools completely separate from [Dispatchers.Default] and [Dispatchers.IO] are required, 37 * they can be created with `newSingleThreadContext` and `newFixedThreadPoolContext` on the JVM and Native targets. 38 * - An arbitrary `java.util.concurrent.Executor` can be converted to a dispatcher with the 39 * `asCoroutineDispatcher` extension function. 40 * 41 * ## Dispatch procedure 42 * 43 * Typically, a dispatch procedure is performed as follows: 44 * 45 * - First, [isDispatchNeeded] is invoked to determine whether the coroutine should be dispatched 46 * or is already in the right context. 47 * - If [isDispatchNeeded] returns `true`, the coroutine is dispatched using the [dispatch] method. 48 * It may take a while for the dispatcher to start the task, 49 * but the [dispatch] method itself may return immediately, before the task has even begun to execute. 50 * - If no dispatch is needed (which is the case for [Dispatchers.Main.immediate][MainCoroutineDispatcher.immediate] 51 * when already on the main thread and for [Dispatchers.Unconfined]), 52 * [dispatch] is typically not called, 53 * and the coroutine is resumed in the thread performing the dispatch procedure, 54 * forming an event loop to prevent stack overflows. 55 * See [Dispatchers.Unconfined] for a description of event loops. 56 * 57 * This behavior may be different on the very first dispatch procedure for a given coroutine, depending on the 58 * [CoroutineStart] parameter of the coroutine builder. 59 */ 60 public abstract class CoroutineDispatcher : 61 AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { 62 63 /** @suppress */ 64 @ExperimentalStdlibApi 65 public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>( 66 ContinuationInterceptor, <lambda>null67 { it as? CoroutineDispatcher }) 68 69 /** 70 * Returns `true` if the execution of the coroutine should be performed with [dispatch] method. 71 * The default behavior for most dispatchers is to return `true`. 72 * 73 * If this method returns `false`, the coroutine is resumed immediately in the current thread, 74 * potentially forming an event-loop to prevent stack overflows. 75 * The event loop is an advanced topic and its implications can be found in [Dispatchers.Unconfined] documentation. 76 * 77 * The [context] parameter represents the context of the coroutine that is being dispatched, 78 * or [EmptyCoroutineContext] if a non-coroutine-specific [Runnable] is dispatched instead. 79 * 80 * A dispatcher can override this method to provide a performance optimization and avoid paying a cost of an unnecessary dispatch. 81 * E.g. [MainCoroutineDispatcher.immediate] checks whether we are already in the required UI thread in this method and avoids 82 * an additional dispatch when it is not required. 83 * 84 * While this approach can be more efficient, it is not chosen by default to provide a consistent dispatching behaviour 85 * so that users won't observe unexpected and non-consistent order of events by default. 86 * 87 * Coroutine builders like [launch][CoroutineScope.launch] and [async][CoroutineScope.async] accept an optional [CoroutineStart] 88 * parameter that allows one to optionally choose the [undispatched][CoroutineStart.UNDISPATCHED] behavior to start coroutine immediately, 89 * but to be resumed only in the provided dispatcher. 90 * 91 * This method should generally be exception-safe. An exception thrown from this method 92 * may leave the coroutines that use this dispatcher in the inconsistent and hard to debug state. 93 * 94 * @see dispatch 95 * @see Dispatchers.Unconfined 96 */ isDispatchNeedednull97 public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true 98 99 /** 100 * Creates a view of the current dispatcher that limits the parallelism to the given [value][parallelism]. 101 * The resulting view uses the original dispatcher for execution but with the guarantee that 102 * no more than [parallelism] coroutines are executed at the same time. 103 * 104 * This method does not impose restrictions on the number of views or the total sum of parallelism values, 105 * each view controls its own parallelism independently with the guarantee that the effective parallelism 106 * of all views cannot exceed the actual parallelism of the original dispatcher. 107 * 108 * The resulting dispatcher does not guarantee that the coroutines will always be dispatched on the same 109 * subset of threads, it only guarantees that at most [parallelism] coroutines are executed at the same time, 110 * and reuses threads from the original dispatchers. 111 * It does not constitute a resource -- it is a _view_ of the underlying dispatcher that can be thrown away 112 * and is not required to be closed. 113 * 114 * ### Example of usage 115 * ``` 116 * // Background dispatcher for the application 117 * val dispatcher = newFixedThreadPoolContext(4, "App Background") 118 * // At most 2 threads will be processing images as it is really slow and CPU-intensive 119 * val imageProcessingDispatcher = dispatcher.limitedParallelism(2, "Image processor") 120 * // At most 3 threads will be processing JSON to avoid image processing starvation 121 * val jsonProcessingDispatcher = dispatcher.limitedParallelism(3, "Json processor") 122 * // At most 1 thread will be doing IO 123 * val fileWriterDispatcher = dispatcher.limitedParallelism(1, "File writer") 124 * ``` 125 * Note how in this example the application has an executor with 4 threads, but the total sum of all limits 126 * is 6. Still, at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism, 127 * and at most 4 threads can exist in the system. 128 * 129 * Note that this example was structured in such a way that it illustrates the parallelism guarantees. 130 * In practice, it is usually better to use `Dispatchers.IO` or [Dispatchers.Default] instead of creating a 131 * `backgroundDispatcher`. 132 * 133 * ### `limitedParallelism(1)` pattern 134 * 135 * One of the common patterns is confining the execution of specific tasks to a sequential execution in background 136 * with `limitedParallelism(1)` invocation. 137 * For that purpose, the implementation guarantees that tasks are executed sequentially and that a happens-before relation 138 * is established between them: 139 * 140 * ``` 141 * val confined = Dispatchers.Default.limitedParallelism(1, "incrementDispatcher") 142 * var counter = 0 143 * 144 * // Invoked from arbitrary coroutines 145 * launch(confined) { 146 * // This increment is sequential and race-free 147 * ++counter 148 * } 149 * ``` 150 * Note that there is no guarantee that the underlying system thread will always be the same. 151 * 152 * ### Dispatchers.IO 153 * 154 * `Dispatcher.IO` is considered _elastic_ for the purposes of limited parallelism -- the sum of 155 * views is not restricted by the capacity of `Dispatchers.IO`. 156 * It means that it is safe to replace `newFixedThreadPoolContext(nThreads)` with 157 * `Dispatchers.IO.limitedParallelism(nThreads)` w.r.t. available number of threads. 158 * See `Dispatchers.IO` documentation for more details. 159 * 160 * ### Restrictions and implementation details 161 * 162 * The default implementation of `limitedParallelism` does not support direct dispatchers, 163 * such as executing the given runnable in place during [dispatch] calls. 164 * Any dispatcher that may return `false` from [isDispatchNeeded] is considered direct. 165 * For direct dispatchers, it is recommended to override this method 166 * and provide a domain-specific implementation or to throw an [UnsupportedOperationException]. 167 * 168 * Implementations of this method are allowed to return `this` if the current dispatcher already satisfies the parallelism requirement. 169 * For example, `Dispatchers.Main.limitedParallelism(1)` returns `Dispatchers.Main`, because the main dispatcher is already single-threaded. 170 * 171 * @param name optional name for the resulting dispatcher string representation if a new dispatcher was created. 172 * Implementations are free to ignore this parameter. 173 * @throws IllegalArgumentException if the given [parallelism] is non-positive 174 * @throws UnsupportedOperationException if the current dispatcher does not support limited parallelism views 175 */ 176 public open fun limitedParallelism(parallelism: Int, name: String? = null): CoroutineDispatcher { 177 parallelism.checkParallelism() 178 return LimitedDispatcher(this, parallelism, name) 179 } 180 181 // Was experimental since 1.6.0, deprecated since 1.8.x 182 @Deprecated("Deprecated for good. Override 'limitedParallelism(parallelism: Int, name: String?)' instead", 183 level = DeprecationLevel.HIDDEN, 184 replaceWith = ReplaceWith("limitedParallelism(parallelism, null)") 185 ) limitedParallelismnull186 public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher = limitedParallelism(parallelism, null) 187 188 /** 189 * Requests execution of a runnable [block]. 190 * The dispatcher guarantees that [block] will eventually execute, typically by dispatching it to a thread pool, 191 * using a dedicated thread, or just executing the block in place. 192 * The [context] parameter represents the context of the coroutine that is being dispatched, 193 * or [EmptyCoroutineContext] if a non-coroutine-specific [Runnable] is dispatched instead. 194 * Implementations may use [context] for additional context-specific information, 195 * such as priority, whether the dispatched coroutine can be invoked in place, 196 * coroutine name, and additional diagnostic elements. 197 * 198 * This method should guarantee that the given [block] will be eventually invoked, 199 * otherwise the system may reach a deadlock state and never leave it. 200 * The cancellation mechanism is transparent for [CoroutineDispatcher] and is managed by [block] internals. 201 * 202 * This method should generally be exception-safe. An exception thrown from this method 203 * may leave the coroutines that use this dispatcher in an inconsistent and hard-to-debug state. 204 * 205 * This method must not immediately call [block]. Doing so may result in `StackOverflowError` 206 * when `dispatch` is invoked repeatedly, for example when [yield] is called in a loop. 207 * In order to execute a block in place, it is required to return `false` from [isDispatchNeeded] 208 * and delegate the `dispatch` implementation to `Dispatchers.Unconfined.dispatch` in such cases. 209 * To support this, the coroutines machinery ensures in-place execution and forms an event-loop to 210 * avoid unbound recursion. 211 * 212 * @see isDispatchNeeded 213 * @see Dispatchers.Unconfined 214 */ 215 public abstract fun dispatch(context: CoroutineContext, block: Runnable) 216 217 /** 218 * Dispatches execution of a runnable `block` onto another thread in the given `context` 219 * with a hint for the dispatcher that the current dispatch is triggered by a [yield] call, so that the execution of this 220 * continuation may be delayed in favor of already dispatched coroutines. 221 * 222 * Though the `yield` marker may be passed as a part of [context], this 223 * is a separate method for performance reasons. 224 * 225 * Implementation note: this entry-point is used for `Dispatchers.IO` and [Dispatchers.Default] 226 * unerlying implementations, see overrides for this method. 227 * 228 * @suppress **This an internal API and should not be used from general code.** 229 */ 230 @InternalCoroutinesApi 231 public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = safeDispatch(context, block) 232 233 /** 234 * Returns a continuation that wraps the provided [continuation], thus intercepting all resumptions. 235 * 236 * This method should generally be exception-safe. An exception thrown from this method 237 * may leave the coroutines that use this dispatcher in the inconsistent and hard to debug state. 238 */ 239 public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = 240 DispatchedContinuation(this, continuation) 241 242 public final override fun releaseInterceptedContinuation(continuation: Continuation<*>) { 243 /* 244 * Unconditional cast is safe here: we return only DispatchedContinuation from `interceptContinuation`, 245 * any ClassCastException can only indicate compiler bug 246 */ 247 val dispatched = continuation as DispatchedContinuation<*> 248 dispatched.release() 249 } 250 251 /** 252 * @suppress **Error**: Operator '+' on two CoroutineDispatcher objects is meaningless. 253 * CoroutineDispatcher is a coroutine context element and `+` is a set-sum operator for coroutine contexts. 254 * The dispatcher to the right of `+` just replaces the dispatcher to the left. 255 */ 256 @Suppress("DeprecatedCallableAddReplaceWith") 257 @Deprecated( 258 message = "Operator '+' on two CoroutineDispatcher objects is meaningless. " + 259 "CoroutineDispatcher is a coroutine context element and `+` is a set-sum operator for coroutine contexts. " + 260 "The dispatcher to the right of `+` just replaces the dispatcher to the left.", 261 level = DeprecationLevel.ERROR 262 ) plusnull263 public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other 264 265 /** @suppress for nicer debugging */ 266 override fun toString(): String = "$classSimpleName@$hexAddress" 267 } 268