1 /*
2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines
6
7 import kotlinx.coroutines.internal.*
8 import kotlin.coroutines.*
9 import kotlin.jvm.*
10
11 /**
12 * Non-cancellable dispatch mode.
13 *
14 * **DO NOT CHANGE THE CONSTANT VALUE**. It might be inlined into legacy user code that was calling
15 * inline `suspendAtomicCancellableCoroutine` function and did not support reuse.
16 */
17 internal const val MODE_ATOMIC = 0
18
19 /**
20 * Cancellable dispatch mode. It is used by user-facing [suspendCancellableCoroutine].
21 * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension.
22 *
23 * **DO NOT CHANGE THE CONSTANT VALUE**. It is being into the user code from [suspendCancellableCoroutine].
24 */
25 @PublishedApi
26 internal const val MODE_CANCELLABLE: Int = 1
27
28 /**
29 * Cancellable dispatch mode for [suspendCancellableCoroutineReusable].
30 * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension;
31 * implementation of reuse checks mode via [Int.isReusableMode] extension.
32 */
33 internal const val MODE_CANCELLABLE_REUSABLE = 2
34
35 /**
36 * Undispatched mode for [CancellableContinuation.resumeUndispatched].
37 * It is used when the thread is right, but it needs to be marked with the current coroutine.
38 */
39 internal const val MODE_UNDISPATCHED = 4
40
41 /**
42 * Initial mode for [DispatchedContinuation] implementation, should never be used for dispatch, because it is always
43 * overwritten when continuation is resumed with the actual resume mode.
44 */
45 internal const val MODE_UNINITIALIZED = -1
46
47 internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE
48 internal val Int.isReusableMode get() = this == MODE_CANCELLABLE_REUSABLE
49
50 @PublishedApi
51 internal abstract class DispatchedTask<in T> internal constructor(
52 // Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102
53 @JvmField public var resumeMode: Int
54 ) : SchedulerTask() {
55 internal abstract val delegate: Continuation<T>
56
takeStatenull57 internal abstract fun takeState(): Any?
58
59 /**
60 * Called when this task was cancelled while it was being dispatched.
61 */
62 internal open fun cancelCompletedResult(takenState: Any?, cause: Throwable) {}
63
64 /**
65 * There are two implementations of `DispatchedTask`:
66 * * [DispatchedContinuation] keeps only simple values as successfully results.
67 * * [CancellableContinuationImpl] keeps additional data with values and overrides this method to unwrap it.
68 */
69 @Suppress("UNCHECKED_CAST")
getSuccessfulResultnull70 internal open fun <T> getSuccessfulResult(state: Any?): T =
71 state as T
72
73 /**
74 * There are two implementations of `DispatchedTask`:
75 * * [DispatchedContinuation] is just an intermediate storage that stores the exception that has its stack-trace
76 * properly recovered and is ready to pass to the [delegate] continuation directly.
77 * * [CancellableContinuationImpl] stores raw cause of the failure in its state; when it needs to be dispatched
78 * its stack-trace has to be recovered, so it overrides this method for that purpose.
79 */
80 internal open fun getExceptionalResult(state: Any?): Throwable? =
81 (state as? CompletedExceptionally)?.cause
82
83 final override fun run() {
84 assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
85 val taskContext = this.taskContext
86 var fatalException: Throwable? = null
87 try {
88 val delegate = delegate as DispatchedContinuation<T>
89 val continuation = delegate.continuation
90 withContinuationContext(continuation, delegate.countOrElement) {
91 val context = continuation.context
92 val state = takeState() // NOTE: Must take state in any case, even if cancelled
93 val exception = getExceptionalResult(state)
94 /*
95 * Check whether continuation was originally resumed with an exception.
96 * If so, it dominates cancellation, otherwise the original exception
97 * will be silently lost.
98 */
99 val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
100 if (job != null && !job.isActive) {
101 val cause = job.getCancellationException()
102 cancelCompletedResult(state, cause)
103 continuation.resumeWithStackTrace(cause)
104 } else {
105 if (exception != null) {
106 continuation.resumeWithException(exception)
107 } else {
108 continuation.resume(getSuccessfulResult(state))
109 }
110 }
111 }
112 } catch (e: Throwable) {
113 // This instead of runCatching to have nicer stacktrace and debug experience
114 fatalException = e
115 } finally {
116 val result = runCatching { taskContext.afterTask() }
117 handleFatalException(fatalException, result.exceptionOrNull())
118 }
119 }
120
121 /**
122 * Machinery that handles fatal exceptions in kotlinx.coroutines.
123 * There are two kinds of fatal exceptions:
124 *
125 * 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either
126 * the library or the compiler has a bug that breaks internal invariants.
127 * They usually have specific workarounds, but require careful study of the cause and should
128 * be reported to the maintainers and fixed on the library's side anyway.
129 *
130 * 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext].
131 * While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement],
132 * we can't ignore it because it may leave coroutine in the inconsistent state.
133 * If you encounter such exception, you can either disable this context element or wrap it into
134 * another context element that catches all exceptions and handles it in the application specific manner.
135 *
136 * Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
137 * a failed coroutine, but such exceptions should be reported anyway.
138 */
handleFatalExceptionnull139 internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) {
140 if (exception === null && finallyException === null) return
141 if (exception !== null && finallyException !== null) {
142 exception.addSuppressedThrowable(finallyException)
143 }
144
145 val cause = exception ?: finallyException
146 val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
147 "Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!)
148 handleCoroutineException(this.delegate.context, reason)
149 }
150 }
151
dispatchnull152 internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
153 assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method
154 val delegate = this.delegate
155 val undispatched = mode == MODE_UNDISPATCHED
156 if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
157 // dispatch directly using this instance's Runnable implementation
158 val dispatcher = delegate.dispatcher
159 val context = delegate.context
160 if (dispatcher.isDispatchNeeded(context)) {
161 dispatcher.dispatch(context, this)
162 } else {
163 resumeUnconfined()
164 }
165 } else {
166 // delegate is coming from 3rd-party interceptor implementation (and does not support cancellation)
167 // or undispatched mode was requested
168 resume(delegate, undispatched)
169 }
170 }
171
resumenull172 internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, undispatched: Boolean) {
173 // This resume is never cancellable. The result is always delivered to delegate continuation.
174 val state = takeState()
175 val exception = getExceptionalResult(state)
176 val result = if (exception != null) Result.failure(exception) else Result.success(getSuccessfulResult<T>(state))
177 when {
178 undispatched -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
179 else -> delegate.resumeWith(result)
180 }
181 }
182
resumeUnconfinednull183 private fun DispatchedTask<*>.resumeUnconfined() {
184 val eventLoop = ThreadLocalEventLoop.eventLoop
185 if (eventLoop.isUnconfinedLoopActive) {
186 // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
187 eventLoop.dispatchUnconfined(this)
188 } else {
189 // Was not active -- run event loop until all unconfined tasks are executed
190 runUnconfinedEventLoop(eventLoop) {
191 resume(delegate, undispatched = true)
192 }
193 }
194 }
195
runUnconfinedEventLoopnull196 internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
197 eventLoop: EventLoop,
198 block: () -> Unit
199 ) {
200 eventLoop.incrementUseCount(unconfined = true)
201 try {
202 block()
203 while (true) {
204 // break when all unconfined continuations where executed
205 if (!eventLoop.processUnconfinedEvent()) break
206 }
207 } catch (e: Throwable) {
208 /*
209 * This exception doesn't happen normally, only if we have a bug in implementation.
210 * Report it as a fatal exception.
211 */
212 handleFatalException(e, null)
213 } finally {
214 eventLoop.decrementUseCount(unconfined = true)
215 }
216 }
217
218 @Suppress("NOTHING_TO_INLINE")
resumeWithStackTracenull219 internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
220 resumeWith(Result.failure(recoverStackTrace(exception, this)))
221 }
222