• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines
2 
3 import kotlinx.coroutines.internal.*
4 import kotlin.coroutines.*
5 
6 /**
7  * Base class to be extended by all coroutine dispatcher implementations.
8  *
9  * If `kotlinx-coroutines` is used, it is recommended to avoid [ContinuationInterceptor] instances that are not
10  * [CoroutineDispatcher] implementations, as [CoroutineDispatcher] ensures that the
11  * debugging facilities in the [newCoroutineContext] function work properly.
12  *
13  * ## Predefined dispatchers
14  *
15  * The following standard implementations are provided by `kotlinx.coroutines` as properties on
16  * the [Dispatchers] object:
17  *
18  * - [Dispatchers.Default] is used by all standard builders if no dispatcher or any other [ContinuationInterceptor]
19  *   is specified in their context.
20  *   It uses a common pool of shared background threads.
21  *   This is an appropriate choice for compute-intensive coroutines that consume CPU resources.
22  * - `Dispatchers.IO` (available on the JVM and Native targets)
23  *   uses a shared pool of on-demand created threads and is designed for offloading of IO-intensive _blocking_
24  *   operations (like file I/O and blocking socket I/O).
25  * - [Dispatchers.Main] represents the UI thread if one is available.
26  * - [Dispatchers.Unconfined] starts coroutine execution in the current call-frame until the first suspension,
27  *   at which point the coroutine builder function returns.
28  *   When the coroutine is resumed, the thread from which it is resumed will run the coroutine code until the next
29  *   suspension, and so on.
30  *   **The `Unconfined` dispatcher should not normally be used in code**.
31  * - Calling [limitedParallelism] on any dispatcher creates a view of the dispatcher that limits the parallelism
32  *   to the given value.
33  *   This allows creating private thread pools without spawning new threads.
34  *   For example, `Dispatchers.IO.limitedParallelism(4)` creates a dispatcher that allows running at most
35  *   4 tasks in parallel, reusing the existing IO dispatcher threads.
36  * - When thread pools completely separate from [Dispatchers.Default] and [Dispatchers.IO] are required,
37  *   they can be created with `newSingleThreadContext` and `newFixedThreadPoolContext` on the JVM and Native targets.
38  * - An arbitrary `java.util.concurrent.Executor` can be converted to a dispatcher with the
39  *   `asCoroutineDispatcher` extension function.
40  *
41  * ## Dispatch procedure
42  *
43  * Typically, a dispatch procedure is performed as follows:
44  *
45  * - First, [isDispatchNeeded] is invoked to determine whether the coroutine should be dispatched
46  *   or is already in the right context.
47  * - If [isDispatchNeeded] returns `true`, the coroutine is dispatched using the [dispatch] method.
48  *   It may take a while for the dispatcher to start the task,
49  *   but the [dispatch] method itself may return immediately, before the task has even begun to execute.
50  * - If no dispatch is needed (which is the case for [Dispatchers.Main.immediate][MainCoroutineDispatcher.immediate]
51  *   when already on the main thread and for [Dispatchers.Unconfined]),
52  *   [dispatch] is typically not called,
53  *   and the coroutine is resumed in the thread performing the dispatch procedure,
54  *   forming an event loop to prevent stack overflows.
55  *   See [Dispatchers.Unconfined] for a description of event loops.
56  *
57  * This behavior may be different on the very first dispatch procedure for a given coroutine, depending on the
58  * [CoroutineStart] parameter of the coroutine builder.
59  */
60 public abstract class CoroutineDispatcher :
61     AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
62 
63     /** @suppress */
64     @ExperimentalStdlibApi
65     public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
66         ContinuationInterceptor,
<lambda>null67         { it as? CoroutineDispatcher })
68 
69     /**
70      * Returns `true` if the execution of the coroutine should be performed with [dispatch] method.
71      * The default behavior for most dispatchers is to return `true`.
72      *
73      * If this method returns `false`, the coroutine is resumed immediately in the current thread,
74      * potentially forming an event-loop to prevent stack overflows.
75      * The event loop is an advanced topic and its implications can be found in [Dispatchers.Unconfined] documentation.
76      *
77      * The [context] parameter represents the context of the coroutine that is being dispatched,
78      * or [EmptyCoroutineContext] if a non-coroutine-specific [Runnable] is dispatched instead.
79      *
80      * A dispatcher can override this method to provide a performance optimization and avoid paying a cost of an unnecessary dispatch.
81      * E.g. [MainCoroutineDispatcher.immediate] checks whether we are already in the required UI thread in this method and avoids
82      * an additional dispatch when it is not required.
83      *
84      * While this approach can be more efficient, it is not chosen by default to provide a consistent dispatching behaviour
85      * so that users won't observe unexpected and non-consistent order of events by default.
86      *
87      * Coroutine builders like [launch][CoroutineScope.launch] and [async][CoroutineScope.async] accept an optional [CoroutineStart]
88      * parameter that allows one to optionally choose the [undispatched][CoroutineStart.UNDISPATCHED] behavior to start coroutine immediately,
89      * but to be resumed only in the provided dispatcher.
90      *
91      * This method should generally be exception-safe. An exception thrown from this method
92      * may leave the coroutines that use this dispatcher in the inconsistent and hard to debug state.
93      *
94      * @see dispatch
95      * @see Dispatchers.Unconfined
96      */
isDispatchNeedednull97     public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
98 
99     /**
100      * Creates a view of the current dispatcher that limits the parallelism to the given [value][parallelism].
101      * The resulting view uses the original dispatcher for execution but with the guarantee that
102      * no more than [parallelism] coroutines are executed at the same time.
103      *
104      * This method does not impose restrictions on the number of views or the total sum of parallelism values,
105      * each view controls its own parallelism independently with the guarantee that the effective parallelism
106      * of all views cannot exceed the actual parallelism of the original dispatcher.
107      *
108      * The resulting dispatcher does not guarantee that the coroutines will always be dispatched on the same
109      * subset of threads, it only guarantees that at most [parallelism] coroutines are executed at the same time,
110      * and reuses threads from the original dispatchers.
111      * It does not constitute a resource -- it is a _view_ of the underlying dispatcher that can be thrown away
112      * and is not required to be closed.
113      *
114      * ### Example of usage
115      * ```
116      * // Background dispatcher for the application
117      * val dispatcher = newFixedThreadPoolContext(4, "App Background")
118      * // At most 2 threads will be processing images as it is really slow and CPU-intensive
119      * val imageProcessingDispatcher = dispatcher.limitedParallelism(2, "Image processor")
120      * // At most 3 threads will be processing JSON to avoid image processing starvation
121      * val jsonProcessingDispatcher = dispatcher.limitedParallelism(3, "Json processor")
122      * // At most 1 thread will be doing IO
123      * val fileWriterDispatcher = dispatcher.limitedParallelism(1, "File writer")
124      * ```
125      * Note how in this example the application has an executor with 4 threads, but the total sum of all limits
126      * is 6. Still, at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism,
127      * and at most 4 threads can exist in the system.
128      *
129      * Note that this example was structured in such a way that it illustrates the parallelism guarantees.
130      * In practice, it is usually better to use `Dispatchers.IO` or [Dispatchers.Default] instead of creating a
131      * `backgroundDispatcher`.
132      *
133      * ### `limitedParallelism(1)` pattern
134      *
135      * One of the common patterns is confining the execution of specific tasks to a sequential execution in background
136      * with `limitedParallelism(1)` invocation.
137      * For that purpose, the implementation guarantees that tasks are executed sequentially and that a happens-before relation
138      * is established between them:
139      *
140      * ```
141      * val confined = Dispatchers.Default.limitedParallelism(1, "incrementDispatcher")
142      * var counter = 0
143      *
144      * // Invoked from arbitrary coroutines
145      * launch(confined) {
146      *     // This increment is sequential and race-free
147      *     ++counter
148      * }
149      * ```
150      * Note that there is no guarantee that the underlying system thread will always be the same.
151      *
152      * ### Dispatchers.IO
153      *
154      * `Dispatcher.IO` is considered _elastic_ for the purposes of limited parallelism -- the sum of
155      * views is not restricted by the capacity of `Dispatchers.IO`.
156      * It means that it is safe to replace `newFixedThreadPoolContext(nThreads)` with
157      * `Dispatchers.IO.limitedParallelism(nThreads)` w.r.t. available number of threads.
158      * See `Dispatchers.IO` documentation for more details.
159      *
160      * ### Restrictions and implementation details
161      *
162      * The default implementation of `limitedParallelism` does not support direct dispatchers,
163      * such as executing the given runnable in place during [dispatch] calls.
164      * Any dispatcher that may return `false` from [isDispatchNeeded] is considered direct.
165      * For direct dispatchers, it is recommended to override this method
166      * and provide a domain-specific implementation or to throw an [UnsupportedOperationException].
167      *
168      * Implementations of this method are allowed to return `this` if the current dispatcher already satisfies the parallelism requirement.
169      * For example, `Dispatchers.Main.limitedParallelism(1)` returns `Dispatchers.Main`, because the main dispatcher is already single-threaded.
170      *
171      * @param name optional name for the resulting dispatcher string representation if a new dispatcher was created.
172      *        Implementations are free to ignore this parameter.
173      * @throws IllegalArgumentException if the given [parallelism] is non-positive
174      * @throws UnsupportedOperationException if the current dispatcher does not support limited parallelism views
175      */
176     public open fun limitedParallelism(parallelism: Int, name: String? = null): CoroutineDispatcher {
177         parallelism.checkParallelism()
178         return LimitedDispatcher(this, parallelism, name)
179     }
180 
181     // Was experimental since 1.6.0, deprecated since 1.8.x
182     @Deprecated("Deprecated for good. Override 'limitedParallelism(parallelism: Int, name: String?)' instead",
183         level = DeprecationLevel.HIDDEN,
184         replaceWith = ReplaceWith("limitedParallelism(parallelism, null)")
185     )
limitedParallelismnull186     public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher = limitedParallelism(parallelism, null)
187 
188     /**
189      * Requests execution of a runnable [block].
190      * The dispatcher guarantees that [block] will eventually execute, typically by dispatching it to a thread pool,
191      * using a dedicated thread, or just executing the block in place.
192      * The [context] parameter represents the context of the coroutine that is being dispatched,
193      * or [EmptyCoroutineContext] if a non-coroutine-specific [Runnable] is dispatched instead.
194      * Implementations may use [context] for additional context-specific information,
195      * such as priority, whether the dispatched coroutine can be invoked in place,
196      * coroutine name, and additional diagnostic elements.
197      *
198      * This method should guarantee that the given [block] will be eventually invoked,
199      * otherwise the system may reach a deadlock state and never leave it.
200      * The cancellation mechanism is transparent for [CoroutineDispatcher] and is managed by [block] internals.
201      *
202      * This method should generally be exception-safe. An exception thrown from this method
203      * may leave the coroutines that use this dispatcher in an inconsistent and hard-to-debug state.
204      *
205      * This method must not immediately call [block]. Doing so may result in `StackOverflowError`
206      * when `dispatch` is invoked repeatedly, for example when [yield] is called in a loop.
207      * In order to execute a block in place, it is required to return `false` from [isDispatchNeeded]
208      * and delegate the `dispatch` implementation to `Dispatchers.Unconfined.dispatch` in such cases.
209      * To support this, the coroutines machinery ensures in-place execution and forms an event-loop to
210      * avoid unbound recursion.
211      *
212      * @see isDispatchNeeded
213      * @see Dispatchers.Unconfined
214      */
215     public abstract fun dispatch(context: CoroutineContext, block: Runnable)
216 
217     /**
218      * Dispatches execution of a runnable `block` onto another thread in the given `context`
219      * with a hint for the dispatcher that the current dispatch is triggered by a [yield] call, so that the execution of this
220      * continuation may be delayed in favor of already dispatched coroutines.
221      *
222      * Though the `yield` marker may be passed as a part of [context], this
223      * is a separate method for performance reasons.
224      *
225      * Implementation note: this entry-point is used for `Dispatchers.IO` and [Dispatchers.Default]
226      * unerlying implementations, see overrides for this method.
227      *
228      * @suppress **This an internal API and should not be used from general code.**
229      */
230     @InternalCoroutinesApi
231     public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = safeDispatch(context, block)
232 
233     /**
234      * Returns a continuation that wraps the provided [continuation], thus intercepting all resumptions.
235      *
236      * This method should generally be exception-safe. An exception thrown from this method
237      * may leave the coroutines that use this dispatcher in the inconsistent and hard to debug state.
238      */
239     public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
240         DispatchedContinuation(this, continuation)
241 
242     public final override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
243         /*
244          * Unconditional cast is safe here: we return only DispatchedContinuation from `interceptContinuation`,
245          * any ClassCastException can only indicate compiler bug
246          */
247         val dispatched = continuation as DispatchedContinuation<*>
248         dispatched.release()
249     }
250 
251     /**
252      * @suppress **Error**: Operator '+' on two CoroutineDispatcher objects is meaningless.
253      * CoroutineDispatcher is a coroutine context element and `+` is a set-sum operator for coroutine contexts.
254      * The dispatcher to the right of `+` just replaces the dispatcher to the left.
255      */
256     @Suppress("DeprecatedCallableAddReplaceWith")
257     @Deprecated(
258         message = "Operator '+' on two CoroutineDispatcher objects is meaningless. " +
259             "CoroutineDispatcher is a coroutine context element and `+` is a set-sum operator for coroutine contexts. " +
260             "The dispatcher to the right of `+` just replaces the dispatcher to the left.",
261         level = DeprecationLevel.ERROR
262     )
plusnull263     public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
264 
265     /** @suppress for nicer debugging */
266     override fun toString(): String = "$classSimpleName@$hexAddress"
267 }
268