1 package kotlinx.coroutines
2
3 import kotlinx.coroutines.internal.*
4 import kotlin.coroutines.*
5 import kotlin.jvm.*
6
7 /**
8 * Non-cancellable dispatch mode.
9 *
10 * **DO NOT CHANGE THE CONSTANT VALUE**. It might be inlined into legacy user code that was calling
11 * inline `suspendAtomicCancellableCoroutine` function and did not support reuse.
12 */
13 internal const val MODE_ATOMIC = 0
14
15 /**
16 * Cancellable dispatch mode. It is used by user-facing [suspendCancellableCoroutine].
17 * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension.
18 *
19 * **DO NOT CHANGE THE CONSTANT VALUE**. It is being into the user code from [suspendCancellableCoroutine].
20 */
21 @PublishedApi
22 internal const val MODE_CANCELLABLE: Int = 1
23
24 /**
25 * Cancellable dispatch mode for [suspendCancellableCoroutineReusable].
26 * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension;
27 * implementation of reuse checks mode via [Int.isReusableMode] extension.
28 */
29 internal const val MODE_CANCELLABLE_REUSABLE = 2
30
31 /**
32 * Undispatched mode for [CancellableContinuation.resumeUndispatched].
33 * It is used when the thread is right, but it needs to be marked with the current coroutine.
34 */
35 internal const val MODE_UNDISPATCHED = 4
36
37 /**
38 * Initial mode for [DispatchedContinuation] implementation, should never be used for dispatch, because it is always
39 * overwritten when continuation is resumed with the actual resume mode.
40 */
41 internal const val MODE_UNINITIALIZED = -1
42
43 internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE
44 internal val Int.isReusableMode get() = this == MODE_CANCELLABLE_REUSABLE
45
46 internal abstract class DispatchedTask<in T> internal constructor(
47 @JvmField var resumeMode: Int
48 ) : SchedulerTask() {
49 internal abstract val delegate: Continuation<T>
50
takeStatenull51 internal abstract fun takeState(): Any?
52
53 /**
54 * Called when this task was cancelled while it was being dispatched.
55 */
56 internal open fun cancelCompletedResult(takenState: Any?, cause: Throwable) {}
57
58 /**
59 * There are two implementations of `DispatchedTask`:
60 * - [DispatchedContinuation] keeps only simple values as successfully results.
61 * - [CancellableContinuationImpl] keeps additional data with values and overrides this method to unwrap it.
62 */
63 @Suppress("UNCHECKED_CAST")
getSuccessfulResultnull64 internal open fun <T> getSuccessfulResult(state: Any?): T =
65 state as T
66
67 /**
68 * There are two implementations of `DispatchedTask`:
69 * - [DispatchedContinuation] is just an intermediate storage that stores the exception that has its stack-trace
70 * properly recovered and is ready to pass to the [delegate] continuation directly.
71 * - [CancellableContinuationImpl] stores raw cause of the failure in its state; when it needs to be dispatched
72 * its stack-trace has to be recovered, so it overrides this method for that purpose.
73 */
74 internal open fun getExceptionalResult(state: Any?): Throwable? =
75 (state as? CompletedExceptionally)?.cause
76
77 final override fun run() {
78 assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
79 try {
80 val delegate = delegate as DispatchedContinuation<T>
81 val continuation = delegate.continuation
82 withContinuationContext(continuation, delegate.countOrElement) {
83 val context = continuation.context
84 val state = takeState() // NOTE: Must take state in any case, even if cancelled
85 val exception = getExceptionalResult(state)
86 /*
87 * Check whether continuation was originally resumed with an exception.
88 * If so, it dominates cancellation, otherwise the original exception
89 * will be silently lost.
90 */
91 val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
92 if (job != null && !job.isActive) {
93 val cause = job.getCancellationException()
94 cancelCompletedResult(state, cause)
95 continuation.resumeWithStackTrace(cause)
96 } else {
97 if (exception != null) {
98 continuation.resumeWithException(exception)
99 } else {
100 continuation.resume(getSuccessfulResult(state))
101 }
102 }
103 }
104 } catch (e: DispatchException) {
105 handleCoroutineException(delegate.context, e.cause)
106 } catch (e: Throwable) {
107 handleFatalException(e)
108 }
109 }
110
111 /**
112 * Machinery that handles fatal exceptions in kotlinx.coroutines.
113 * There are two kinds of fatal exceptions:
114 *
115 * 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either
116 * the library or the compiler has a bug that breaks internal invariants.
117 * They usually have specific workarounds, but require careful study of the cause and should
118 * be reported to the maintainers and fixed on the library's side anyway.
119 *
120 * 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext].
121 * While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement],
122 * we can't ignore it because it may leave coroutine in the inconsistent state.
123 * If you encounter such exception, you can either disable this context element or wrap it into
124 * another context element that catches all exceptions and handles it in the application specific manner.
125 *
126 * Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
127 * a failed coroutine, but such exceptions should be reported anyway.
128 */
handleFatalExceptionnull129 internal fun handleFatalException(exception: Throwable) {
130 val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
131 "Please read KDoc to 'handleFatalException' method and report this incident to maintainers", exception)
132 handleCoroutineException(this.delegate.context, reason)
133 }
134 }
135
dispatchnull136 internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
137 assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method
138 val delegate = this.delegate
139 val undispatched = mode == MODE_UNDISPATCHED
140 if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
141 // dispatch directly using this instance's Runnable implementation
142 val dispatcher = delegate.dispatcher
143 val context = delegate.context
144 if (dispatcher.safeIsDispatchNeeded(context)) {
145 dispatcher.safeDispatch(context, this)
146 } else {
147 resumeUnconfined()
148 }
149 } else {
150 // delegate is coming from 3rd-party interceptor implementation (and does not support cancellation)
151 // or undispatched mode was requested
152 resume(delegate, undispatched)
153 }
154 }
155
resumenull156 internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, undispatched: Boolean) {
157 // This resume is never cancellable. The result is always delivered to delegate continuation.
158 val state = takeState()
159 val exception = getExceptionalResult(state)
160 val result = if (exception != null) Result.failure(exception) else Result.success(getSuccessfulResult<T>(state))
161 when {
162 undispatched -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
163 else -> delegate.resumeWith(result)
164 }
165 }
166
resumeUnconfinednull167 private fun DispatchedTask<*>.resumeUnconfined() {
168 val eventLoop = ThreadLocalEventLoop.eventLoop
169 if (eventLoop.isUnconfinedLoopActive) {
170 // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
171 eventLoop.dispatchUnconfined(this)
172 } else {
173 // Was not active -- run event loop until all unconfined tasks are executed
174 runUnconfinedEventLoop(eventLoop) {
175 resume(delegate, undispatched = true)
176 }
177 }
178 }
179
runUnconfinedEventLoopnull180 internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
181 eventLoop: EventLoop,
182 block: () -> Unit
183 ) {
184 eventLoop.incrementUseCount(unconfined = true)
185 try {
186 block()
187 while (true) {
188 // break when all unconfined continuations where executed
189 if (!eventLoop.processUnconfinedEvent()) break
190 }
191 } catch (e: Throwable) {
192 /*
193 * This exception doesn't happen normally, only if we have a bug in implementation.
194 * Report it as a fatal exception.
195 */
196 handleFatalException(e)
197 } finally {
198 eventLoop.decrementUseCount(unconfined = true)
199 }
200 }
201
202 @Suppress("NOTHING_TO_INLINE")
resumeWithStackTracenull203 internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
204 resumeWith(Result.failure(recoverStackTrace(exception, this)))
205 }
206
207 /**
208 * This exception holds an exception raised in [CoroutineDispatcher.dispatch] method.
209 * When dispatcher methods fail unexpectedly, it is likely a user-induced programmatic bug,
210 * such as calling `executor.close()` prematurely. To avoid reporting such exceptions as fatal errors,
211 * we handle them with a separate code path. See also #4091.
212 *
213 * @see safeDispatch
214 */
215 internal class DispatchException(
216 override val cause: Throwable,
217 dispatcher: CoroutineDispatcher,
218 context: CoroutineContext,
219 ) : Exception("Coroutine dispatcher $dispatcher threw an exception, context = $context", cause)
220