1 /*
2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines
6
7 import kotlinx.coroutines.internal.*
8 import kotlin.coroutines.*
9 import kotlin.jvm.*
10
11 /**
12 * Non-cancellable dispatch mode.
13 *
14 * **DO NOT CHANGE THE CONSTANT VALUE**. It might be inlined into legacy user code that was calling
15 * inline `suspendAtomicCancellableCoroutine` function and did not support reuse.
16 */
17 internal const val MODE_ATOMIC = 0
18
19 /**
20 * Cancellable dispatch mode. It is used by user-facing [suspendCancellableCoroutine].
21 * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension.
22 *
23 * **DO NOT CHANGE THE CONSTANT VALUE**. It is being into the user code from [suspendCancellableCoroutine].
24 */
25 @PublishedApi
26 internal const val MODE_CANCELLABLE = 1
27
28 /**
29 * Cancellable dispatch mode for [suspendCancellableCoroutineReusable].
30 * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension;
31 * implementation of reuse checks mode via [Int.isReusableMode] extension.
32 */
33 internal const val MODE_CANCELLABLE_REUSABLE = 2
34
35 /**
36 * Undispatched mode for [CancellableContinuation.resumeUndispatched].
37 * It is used when the thread is right, but it needs to be marked with the current coroutine.
38 */
39 internal const val MODE_UNDISPATCHED = 4
40
41 /**
42 * Initial mode for [DispatchedContinuation] implementation, should never be used for dispatch, because it is always
43 * overwritten when continuation is resumed with the actual resume mode.
44 */
45 internal const val MODE_UNINITIALIZED = -1
46
47 internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE
48 internal val Int.isReusableMode get() = this == MODE_CANCELLABLE_REUSABLE
49
50 internal abstract class DispatchedTask<in T>(
51 @JvmField public var resumeMode: Int
52 ) : SchedulerTask() {
53 internal abstract val delegate: Continuation<T>
54
takeStatenull55 internal abstract fun takeState(): Any?
56
57 /**
58 * Called when this task was cancelled while it was being dispatched.
59 */
60 internal open fun cancelCompletedResult(takenState: Any?, cause: Throwable) {}
61
62 /**
63 * There are two implementations of `DispatchedTask`:
64 * * [DispatchedContinuation] keeps only simple values as successfully results.
65 * * [CancellableContinuationImpl] keeps additional data with values and overrides this method to unwrap it.
66 */
67 @Suppress("UNCHECKED_CAST")
getSuccessfulResultnull68 internal open fun <T> getSuccessfulResult(state: Any?): T =
69 state as T
70
71 /**
72 * There are two implementations of `DispatchedTask`:
73 * * [DispatchedContinuation] is just an intermediate storage that stores the exception that has its stack-trace
74 * properly recovered and is ready to pass to the [delegate] continuation directly.
75 * * [CancellableContinuationImpl] stores raw cause of the failure in its state; when it needs to be dispatched
76 * its stack-trace has to be recovered, so it overrides this method for that purpose.
77 */
78 internal open fun getExceptionalResult(state: Any?): Throwable? =
79 (state as? CompletedExceptionally)?.cause
80
81 public final override fun run() {
82 assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
83 val taskContext = this.taskContext
84 var fatalException: Throwable? = null
85 try {
86 val delegate = delegate as DispatchedContinuation<T>
87 val continuation = delegate.continuation
88 val context = continuation.context
89 val state = takeState() // NOTE: Must take state in any case, even if cancelled
90 withCoroutineContext(context, delegate.countOrElement) {
91 val exception = getExceptionalResult(state)
92 /*
93 * Check whether continuation was originally resumed with an exception.
94 * If so, it dominates cancellation, otherwise the original exception
95 * will be silently lost.
96 */
97 val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
98 if (job != null && !job.isActive) {
99 val cause = job.getCancellationException()
100 cancelCompletedResult(state, cause)
101 continuation.resumeWithStackTrace(cause)
102 } else {
103 if (exception != null) {
104 continuation.resumeWithException(exception)
105 } else {
106 continuation.resume(getSuccessfulResult(state))
107 }
108 }
109 }
110 } catch (e: Throwable) {
111 // This instead of runCatching to have nicer stacktrace and debug experience
112 fatalException = e
113 } finally {
114 val result = runCatching { taskContext.afterTask() }
115 handleFatalException(fatalException, result.exceptionOrNull())
116 }
117 }
118
119 /**
120 * Machinery that handles fatal exceptions in kotlinx.coroutines.
121 * There are two kinds of fatal exceptions:
122 *
123 * 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either
124 * the library or the compiler has a bug that breaks internal invariants.
125 * They usually have specific workarounds, but require careful study of the cause and should
126 * be reported to the maintainers and fixed on the library's side anyway.
127 *
128 * 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext].
129 * While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement],
130 * we can't ignore it because it may leave coroutine in the inconsistent state.
131 * If you encounter such exception, you can either disable this context element or wrap it into
132 * another context element that catches all exceptions and handles it in the application specific manner.
133 *
134 * Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
135 * a failed coroutine, but such exceptions should be reported anyway.
136 */
handleFatalExceptionnull137 internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) {
138 if (exception === null && finallyException === null) return
139 if (exception !== null && finallyException !== null) {
140 exception.addSuppressedThrowable(finallyException)
141 }
142
143 val cause = exception ?: finallyException
144 val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
145 "Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!)
146 handleCoroutineException(this.delegate.context, reason)
147 }
148 }
149
dispatchnull150 internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
151 assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method
152 val delegate = this.delegate
153 val undispatched = mode == MODE_UNDISPATCHED
154 if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
155 // dispatch directly using this instance's Runnable implementation
156 val dispatcher = delegate.dispatcher
157 val context = delegate.context
158 if (dispatcher.isDispatchNeeded(context)) {
159 dispatcher.dispatch(context, this)
160 } else {
161 resumeUnconfined()
162 }
163 } else {
164 // delegate is coming from 3rd-party interceptor implementation (and does not support cancellation)
165 // or undispatched mode was requested
166 resume(delegate, undispatched)
167 }
168 }
169
170 @Suppress("UNCHECKED_CAST")
resumenull171 internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, undispatched: Boolean) {
172 // This resume is never cancellable. The result is always delivered to delegate continuation.
173 val state = takeState()
174 val exception = getExceptionalResult(state)
175 val result = if (exception != null) Result.failure(exception) else Result.success(getSuccessfulResult<T>(state))
176 when {
177 undispatched -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
178 else -> delegate.resumeWith(result)
179 }
180 }
181
resumeUnconfinednull182 private fun DispatchedTask<*>.resumeUnconfined() {
183 val eventLoop = ThreadLocalEventLoop.eventLoop
184 if (eventLoop.isUnconfinedLoopActive) {
185 // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
186 eventLoop.dispatchUnconfined(this)
187 } else {
188 // Was not active -- run event loop until all unconfined tasks are executed
189 runUnconfinedEventLoop(eventLoop) {
190 resume(delegate, undispatched = true)
191 }
192 }
193 }
194
runUnconfinedEventLoopnull195 internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
196 eventLoop: EventLoop,
197 block: () -> Unit
198 ) {
199 eventLoop.incrementUseCount(unconfined = true)
200 try {
201 block()
202 while (true) {
203 // break when all unconfined continuations where executed
204 if (!eventLoop.processUnconfinedEvent()) break
205 }
206 } catch (e: Throwable) {
207 /*
208 * This exception doesn't happen normally, only if we have a bug in implementation.
209 * Report it as a fatal exception.
210 */
211 handleFatalException(e, null)
212 } finally {
213 eventLoop.decrementUseCount(unconfined = true)
214 }
215 }
216
217 @Suppress("NOTHING_TO_INLINE")
resumeWithStackTracenull218 internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
219 resumeWith(Result.failure(recoverStackTrace(exception, this)))
220 }
221