• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.internal.*
8 import kotlin.coroutines.*
9 import kotlin.jvm.*
10 
11 /**
12  * Non-cancellable dispatch mode.
13  *
14  * **DO NOT CHANGE THE CONSTANT VALUE**. It might be inlined into legacy user code that was calling
15  * inline `suspendAtomicCancellableCoroutine` function and did not support reuse.
16  */
17 internal const val MODE_ATOMIC = 0
18 
19 /**
20  * Cancellable dispatch mode. It is used by user-facing [suspendCancellableCoroutine].
21  * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension.
22  *
23  * **DO NOT CHANGE THE CONSTANT VALUE**. It is being into the user code from [suspendCancellableCoroutine].
24  */
25 @PublishedApi
26 internal const val MODE_CANCELLABLE: Int = 1
27 
28 /**
29  * Cancellable dispatch mode for [suspendCancellableCoroutineReusable].
30  * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension;
31  * implementation of reuse checks mode via [Int.isReusableMode] extension.
32  */
33 internal const val MODE_CANCELLABLE_REUSABLE = 2
34 
35 /**
36  * Undispatched mode for [CancellableContinuation.resumeUndispatched].
37  * It is used when the thread is right, but it needs to be marked with the current coroutine.
38  */
39 internal const val MODE_UNDISPATCHED = 4
40 
41 /**
42  * Initial mode for [DispatchedContinuation] implementation, should never be used for dispatch, because it is always
43  * overwritten when continuation is resumed with the actual resume mode.
44  */
45 internal const val MODE_UNINITIALIZED = -1
46 
47 internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE
48 internal val Int.isReusableMode get() = this == MODE_CANCELLABLE_REUSABLE
49 
50 @PublishedApi
51 internal abstract class DispatchedTask<in T> internal constructor(
52     // Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102
53     @JvmField public var resumeMode: Int
54 ) : SchedulerTask() {
55     internal abstract val delegate: Continuation<T>
56 
takeStatenull57     internal abstract fun takeState(): Any?
58 
59     /**
60      * Called when this task was cancelled while it was being dispatched.
61      */
62     internal open fun cancelCompletedResult(takenState: Any?, cause: Throwable) {}
63 
64     /**
65      * There are two implementations of `DispatchedTask`:
66      * * [DispatchedContinuation] keeps only simple values as successfully results.
67      * * [CancellableContinuationImpl] keeps additional data with values and overrides this method to unwrap it.
68      */
69     @Suppress("UNCHECKED_CAST")
getSuccessfulResultnull70     internal open fun <T> getSuccessfulResult(state: Any?): T =
71         state as T
72 
73     /**
74      * There are two implementations of `DispatchedTask`:
75      * * [DispatchedContinuation] is just an intermediate storage that stores the exception that has its stack-trace
76      *   properly recovered and is ready to pass to the [delegate] continuation directly.
77      * * [CancellableContinuationImpl] stores raw cause of the failure in its state; when it needs to be dispatched
78      *   its stack-trace has to be recovered, so it overrides this method for that purpose.
79      */
80     internal open fun getExceptionalResult(state: Any?): Throwable? =
81         (state as? CompletedExceptionally)?.cause
82 
83     final override fun run() {
84         assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
85         val taskContext = this.taskContext
86         var fatalException: Throwable? = null
87         try {
88             val delegate = delegate as DispatchedContinuation<T>
89             val continuation = delegate.continuation
90             withContinuationContext(continuation, delegate.countOrElement) {
91                 val context = continuation.context
92                 val state = takeState() // NOTE: Must take state in any case, even if cancelled
93                 val exception = getExceptionalResult(state)
94                 /*
95                  * Check whether continuation was originally resumed with an exception.
96                  * If so, it dominates cancellation, otherwise the original exception
97                  * will be silently lost.
98                  */
99                 val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
100                 if (job != null && !job.isActive) {
101                     val cause = job.getCancellationException()
102                     cancelCompletedResult(state, cause)
103                     continuation.resumeWithStackTrace(cause)
104                 } else {
105                     if (exception != null) {
106                         continuation.resumeWithException(exception)
107                     } else {
108                         continuation.resume(getSuccessfulResult(state))
109                     }
110                 }
111             }
112         } catch (e: Throwable) {
113             // This instead of runCatching to have nicer stacktrace and debug experience
114             fatalException = e
115         } finally {
116             val result = runCatching { taskContext.afterTask() }
117             handleFatalException(fatalException, result.exceptionOrNull())
118         }
119     }
120 
121     /**
122      * Machinery that handles fatal exceptions in kotlinx.coroutines.
123      * There are two kinds of fatal exceptions:
124      *
125      * 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either
126      *    the library or the compiler has a bug that breaks internal invariants.
127      *    They usually have specific workarounds, but require careful study of the cause and should
128      *    be reported to the maintainers and fixed on the library's side anyway.
129      *
130      * 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext].
131      *    While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement],
132      *    we can't ignore it because it may leave coroutine in the inconsistent state.
133      *    If you encounter such exception, you can either disable this context element or wrap it into
134      *    another context element that catches all exceptions and handles it in the application specific manner.
135      *
136      * Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
137      * a failed coroutine, but such exceptions should be reported anyway.
138      */
handleFatalExceptionnull139     internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) {
140         if (exception === null && finallyException === null) return
141         if (exception !== null && finallyException !== null) {
142             exception.addSuppressedThrowable(finallyException)
143         }
144 
145         val cause = exception ?: finallyException
146         val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
147                 "Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!)
148         handleCoroutineException(this.delegate.context, reason)
149     }
150 }
151 
dispatchnull152 internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
153     assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method
154     val delegate = this.delegate
155     val undispatched = mode == MODE_UNDISPATCHED
156     if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
157         // dispatch directly using this instance's Runnable implementation
158         val dispatcher = delegate.dispatcher
159         val context = delegate.context
160         if (dispatcher.isDispatchNeeded(context)) {
161             dispatcher.dispatch(context, this)
162         } else {
163             resumeUnconfined()
164         }
165     } else {
166         // delegate is coming from 3rd-party interceptor implementation (and does not support cancellation)
167         // or undispatched mode was requested
168         resume(delegate, undispatched)
169     }
170 }
171 
resumenull172 internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, undispatched: Boolean) {
173     // This resume is never cancellable. The result is always delivered to delegate continuation.
174     val state = takeState()
175     val exception = getExceptionalResult(state)
176     val result = if (exception != null) Result.failure(exception) else Result.success(getSuccessfulResult<T>(state))
177     when {
178         undispatched -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
179         else -> delegate.resumeWith(result)
180     }
181 }
182 
resumeUnconfinednull183 private fun DispatchedTask<*>.resumeUnconfined() {
184     val eventLoop = ThreadLocalEventLoop.eventLoop
185     if (eventLoop.isUnconfinedLoopActive) {
186         // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
187         eventLoop.dispatchUnconfined(this)
188     } else {
189         // Was not active -- run event loop until all unconfined tasks are executed
190         runUnconfinedEventLoop(eventLoop) {
191             resume(delegate, undispatched = true)
192         }
193     }
194 }
195 
runUnconfinedEventLoopnull196 internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
197     eventLoop: EventLoop,
198     block: () -> Unit
199 ) {
200     eventLoop.incrementUseCount(unconfined = true)
201     try {
202         block()
203         while (true) {
204             // break when all unconfined continuations where executed
205             if (!eventLoop.processUnconfinedEvent()) break
206         }
207     } catch (e: Throwable) {
208         /*
209          * This exception doesn't happen normally, only if we have a bug in implementation.
210          * Report it as a fatal exception.
211          */
212         handleFatalException(e, null)
213     } finally {
214         eventLoop.decrementUseCount(unconfined = true)
215     }
216 }
217 
218 @Suppress("NOTHING_TO_INLINE")
resumeWithStackTracenull219 internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
220     resumeWith(Result.failure(recoverStackTrace(exception, this)))
221 }
222