• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines
2 
3 import kotlinx.coroutines.internal.*
4 import kotlin.coroutines.*
5 import kotlin.jvm.*
6 
7 /**
8  * Non-cancellable dispatch mode.
9  *
10  * **DO NOT CHANGE THE CONSTANT VALUE**. It might be inlined into legacy user code that was calling
11  * inline `suspendAtomicCancellableCoroutine` function and did not support reuse.
12  */
13 internal const val MODE_ATOMIC = 0
14 
15 /**
16  * Cancellable dispatch mode. It is used by user-facing [suspendCancellableCoroutine].
17  * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension.
18  *
19  * **DO NOT CHANGE THE CONSTANT VALUE**. It is being into the user code from [suspendCancellableCoroutine].
20  */
21 @PublishedApi
22 internal const val MODE_CANCELLABLE: Int = 1
23 
24 /**
25  * Cancellable dispatch mode for [suspendCancellableCoroutineReusable].
26  * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension;
27  * implementation of reuse checks mode via [Int.isReusableMode] extension.
28  */
29 internal const val MODE_CANCELLABLE_REUSABLE = 2
30 
31 /**
32  * Undispatched mode for [CancellableContinuation.resumeUndispatched].
33  * It is used when the thread is right, but it needs to be marked with the current coroutine.
34  */
35 internal const val MODE_UNDISPATCHED = 4
36 
37 /**
38  * Initial mode for [DispatchedContinuation] implementation, should never be used for dispatch, because it is always
39  * overwritten when continuation is resumed with the actual resume mode.
40  */
41 internal const val MODE_UNINITIALIZED = -1
42 
43 internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE
44 internal val Int.isReusableMode get() = this == MODE_CANCELLABLE_REUSABLE
45 
46 internal abstract class DispatchedTask<in T> internal constructor(
47     @JvmField var resumeMode: Int
48 ) : SchedulerTask() {
49     internal abstract val delegate: Continuation<T>
50 
takeStatenull51     internal abstract fun takeState(): Any?
52 
53     /**
54      * Called when this task was cancelled while it was being dispatched.
55      */
56     internal open fun cancelCompletedResult(takenState: Any?, cause: Throwable) {}
57 
58     /**
59      * There are two implementations of `DispatchedTask`:
60      * - [DispatchedContinuation] keeps only simple values as successfully results.
61      * - [CancellableContinuationImpl] keeps additional data with values and overrides this method to unwrap it.
62      */
63     @Suppress("UNCHECKED_CAST")
getSuccessfulResultnull64     internal open fun <T> getSuccessfulResult(state: Any?): T =
65         state as T
66 
67     /**
68      * There are two implementations of `DispatchedTask`:
69      * - [DispatchedContinuation] is just an intermediate storage that stores the exception that has its stack-trace
70      *   properly recovered and is ready to pass to the [delegate] continuation directly.
71      * - [CancellableContinuationImpl] stores raw cause of the failure in its state; when it needs to be dispatched
72      *   its stack-trace has to be recovered, so it overrides this method for that purpose.
73      */
74     internal open fun getExceptionalResult(state: Any?): Throwable? =
75         (state as? CompletedExceptionally)?.cause
76 
77     final override fun run() {
78         assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
79         try {
80             val delegate = delegate as DispatchedContinuation<T>
81             val continuation = delegate.continuation
82             withContinuationContext(continuation, delegate.countOrElement) {
83                 val context = continuation.context
84                 val state = takeState() // NOTE: Must take state in any case, even if cancelled
85                 val exception = getExceptionalResult(state)
86                 /*
87                  * Check whether continuation was originally resumed with an exception.
88                  * If so, it dominates cancellation, otherwise the original exception
89                  * will be silently lost.
90                  */
91                 val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
92                 if (job != null && !job.isActive) {
93                     val cause = job.getCancellationException()
94                     cancelCompletedResult(state, cause)
95                     continuation.resumeWithStackTrace(cause)
96                 } else {
97                     if (exception != null) {
98                         continuation.resumeWithException(exception)
99                     } else {
100                         continuation.resume(getSuccessfulResult(state))
101                     }
102                 }
103             }
104         } catch (e: DispatchException) {
105             handleCoroutineException(delegate.context, e.cause)
106         } catch (e: Throwable) {
107             handleFatalException(e)
108         }
109     }
110 
111     /**
112      * Machinery that handles fatal exceptions in kotlinx.coroutines.
113      * There are two kinds of fatal exceptions:
114      *
115      * 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either
116      *    the library or the compiler has a bug that breaks internal invariants.
117      *    They usually have specific workarounds, but require careful study of the cause and should
118      *    be reported to the maintainers and fixed on the library's side anyway.
119      *
120      * 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext].
121      *    While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement],
122      *    we can't ignore it because it may leave coroutine in the inconsistent state.
123      *    If you encounter such exception, you can either disable this context element or wrap it into
124      *    another context element that catches all exceptions and handles it in the application specific manner.
125      *
126      * Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
127      * a failed coroutine, but such exceptions should be reported anyway.
128      */
handleFatalExceptionnull129     internal fun handleFatalException(exception: Throwable) {
130         val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
131                 "Please read KDoc to 'handleFatalException' method and report this incident to maintainers", exception)
132         handleCoroutineException(this.delegate.context, reason)
133     }
134 }
135 
dispatchnull136 internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
137     assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method
138     val delegate = this.delegate
139     val undispatched = mode == MODE_UNDISPATCHED
140     if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
141         // dispatch directly using this instance's Runnable implementation
142         val dispatcher = delegate.dispatcher
143         val context = delegate.context
144         if (dispatcher.safeIsDispatchNeeded(context)) {
145             dispatcher.safeDispatch(context, this)
146         } else {
147             resumeUnconfined()
148         }
149     } else {
150         // delegate is coming from 3rd-party interceptor implementation (and does not support cancellation)
151         // or undispatched mode was requested
152         resume(delegate, undispatched)
153     }
154 }
155 
resumenull156 internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, undispatched: Boolean) {
157     // This resume is never cancellable. The result is always delivered to delegate continuation.
158     val state = takeState()
159     val exception = getExceptionalResult(state)
160     val result = if (exception != null) Result.failure(exception) else Result.success(getSuccessfulResult<T>(state))
161     when {
162         undispatched -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
163         else -> delegate.resumeWith(result)
164     }
165 }
166 
resumeUnconfinednull167 private fun DispatchedTask<*>.resumeUnconfined() {
168     val eventLoop = ThreadLocalEventLoop.eventLoop
169     if (eventLoop.isUnconfinedLoopActive) {
170         // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
171         eventLoop.dispatchUnconfined(this)
172     } else {
173         // Was not active -- run event loop until all unconfined tasks are executed
174         runUnconfinedEventLoop(eventLoop) {
175             resume(delegate, undispatched = true)
176         }
177     }
178 }
179 
runUnconfinedEventLoopnull180 internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
181     eventLoop: EventLoop,
182     block: () -> Unit
183 ) {
184     eventLoop.incrementUseCount(unconfined = true)
185     try {
186         block()
187         while (true) {
188             // break when all unconfined continuations where executed
189             if (!eventLoop.processUnconfinedEvent()) break
190         }
191     } catch (e: Throwable) {
192         /*
193          * This exception doesn't happen normally, only if we have a bug in implementation.
194          * Report it as a fatal exception.
195          */
196         handleFatalException(e)
197     } finally {
198         eventLoop.decrementUseCount(unconfined = true)
199     }
200 }
201 
202 @Suppress("NOTHING_TO_INLINE")
resumeWithStackTracenull203 internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
204     resumeWith(Result.failure(recoverStackTrace(exception, this)))
205 }
206 
207 /**
208  * This exception holds an exception raised in [CoroutineDispatcher.dispatch] method.
209  * When dispatcher methods fail unexpectedly, it is likely a user-induced programmatic bug,
210  * such as calling `executor.close()` prematurely. To avoid reporting such exceptions as fatal errors,
211  * we handle them with a separate code path. See also #4091.
212  *
213  * @see safeDispatch
214  */
215 internal class DispatchException(
216     override val cause: Throwable,
217     dispatcher: CoroutineDispatcher,
218     context: CoroutineContext,
219 ) : Exception("Coroutine dispatcher $dispatcher threw an exception, context = $context", cause)
220